diff --git a/README.md b/README.md index cd749ef3..f2644dd7 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ * **Message delivery**: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery) * **Redis cluster** : Redis cluster can be used with driver. * **Metrics** : In flight messages, waiting for consumption and delayed messages -* **Web interface**: a web interface to manage a queue and queue insights including latency +* **Web Dashboard**: a web dashboard to manage a queue and queue insights including latency * **Automatic message serialization and deserialization** * **Concurrency**: Concurrency of any queue can be configured * **Queue Priority** : @@ -28,6 +28,7 @@ * **Callbacks** : Callbacks for dead letter queue, discard etc * **Events** 1. Bootstrap event 2. Task execution event. * **Unique message** : Unique message processing for a queue based on the message id +* **Periodic message** : Process same message at certain interval * **Redis connection**: A different redis setup can be used for Rqueue ## Getting Started @@ -117,6 +118,12 @@ public class MessageService { public void sendSms(Sms sms, SmsPriority priority){ rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms); } + + // enqueue periodic job, email should be sent every 30 seconds + public void sendPeriodicEmail(Email email){ + rqueueMessageEnqueuer.enqueuePeriodic("email-queue", invoice, 30_000); + } + } ``` diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java index d5548cef..f5185325 100644 --- a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java @@ -25,6 +25,7 @@ import com.github.sonus21.rqueue.test.dto.FeedGeneration; import com.github.sonus21.rqueue.test.dto.Job; import com.github.sonus21.rqueue.test.dto.Notification; +import com.github.sonus21.rqueue.test.dto.PeriodicJob; import com.github.sonus21.rqueue.test.dto.Reservation; import com.github.sonus21.rqueue.test.dto.ReservationRequest; import com.github.sonus21.rqueue.test.dto.Sms; @@ -36,6 +37,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @@ -49,13 +51,46 @@ public class MessageListener { @NonNull private ConsumedMessageService consumedMessageService; @NonNull private FailureManager failureManager; + @Value("${job.queue.name}") + private String jobQueue; + + @Value("${notification.queue.name}") + private String notificationQueueName; + + @Value("${email.queue.name}") + private String emailQueue; + + @Value("${sms.queue}") + private String smsQueue; + + @Value("${chat.indexing.queue}") + private String chatIndexingQueue; + + @Value("${feed.generation.queue}") + private String feedGenerationQueue; + + @Value("${reservation.queue}") + private String reservationQueue; + + @Value("${reservation.request.queue.name}") + private String reservationRequestQueue; + + @Value("${reservation.request.dead.letter.queue.name}") + private String reservationRequestDeadLetterQueue; + + @Value("${list.email.queue.name}") + private String listEmailQueue; + + @Value("${periodic.job.queue.name}") + private String periodicJobQueue; + @RqueueListener(value = "${job.queue.name}", active = "${job.queue.active}") public void onMessage(Job job) throws Exception { log.info("Job: {}", job); if (failureManager.shouldFail(job.getId())) { throw new Exception("Failing job task to be retried" + job); } - consumedMessageService.save(job, null); + consumedMessageService.save(job, null, jobQueue); } @RqueueListener( @@ -69,7 +104,7 @@ public void onMessage( if (failureManager.shouldFail(notification.getId())) { throw new Exception("Failing notification task to be retried" + notification); } - consumedMessageService.save(notification, null); + consumedMessageService.save(notification, null, notificationQueueName); } @RqueueListener( @@ -84,7 +119,7 @@ public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueM if (failureManager.shouldFail(email.getId())) { throw new Exception("Failing email task to be retried" + email); } - consumedMessageService.save(email, null); + consumedMessageService.save(email, null, emailQueue); } @RqueueListener( @@ -98,7 +133,7 @@ public void onMessage(Sms sms) throws Exception { if (failureManager.shouldFail(sms.getId())) { throw new Exception("Failing sms task to be retried" + sms); } - consumedMessageService.save(sms, null); + consumedMessageService.save(sms, null, smsQueue); } @RqueueListener( @@ -112,7 +147,7 @@ public void onMessage(ChatIndexing chatIndexing) throws Exception { if (failureManager.shouldFail(chatIndexing.getId())) { throw new Exception("Failing chat indexing task to be retried" + chatIndexing); } - consumedMessageService.save(chatIndexing, null); + consumedMessageService.save(chatIndexing, null, chatIndexingQueue); } @RqueueListener( @@ -126,7 +161,7 @@ public void onMessage(FeedGeneration feedGeneration) throws Exception { if (failureManager.shouldFail(feedGeneration.getId())) { throw new Exception("Failing feedGeneration task to be retried" + feedGeneration); } - consumedMessageService.save(feedGeneration, null); + consumedMessageService.save(feedGeneration, null, feedGenerationQueue); } @RqueueListener( @@ -140,7 +175,7 @@ public void onMessage(Reservation reservation) throws Exception { if (failureManager.shouldFail(reservation.getId())) { throw new Exception("Failing reservation task to be retried" + reservation); } - consumedMessageService.save(reservation, null); + consumedMessageService.save(reservation, null, reservationQueue); } @RqueueListener( @@ -154,7 +189,7 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep if (failureManager.shouldFail(request.getId())) { throw new Exception("Failing reservation request task to be retried" + request); } - consumedMessageService.save(request, null); + consumedMessageService.save(request, null, reservationRequestQueue); } @RqueueListener( @@ -164,7 +199,8 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep public void onMessageReservationRequestDeadLetterQueue(ReservationRequest request) throws Exception { log.info("ReservationRequest Dead Letter Queue{}", request); - consumedMessageService.save(request, "reservation-request-dlq"); + consumedMessageService.save( + request, "reservation-request-dlq", reservationRequestDeadLetterQueue); } @RqueueListener(value = "${list.email.queue.name}", active = "${list.email.queue.enabled}") @@ -172,7 +208,20 @@ public void onMessageEmailList(List emailList) throws JsonProcessingExcep log.info("onMessageEmailList {}", emailList); String consumedId = UUID.randomUUID().toString(); for (Email email : emailList) { - consumedMessageService.save(email, consumedId); + consumedMessageService.save(email, consumedId, listEmailQueue); + } + } + + @RqueueListener( + value = "${periodic.job.queue.name}", + active = "${periodic.job.queue.active}", + deadLetterQueue = "${periodic.job.dead.letter.queue.name}", + numRetries = "${periodic.job.queue.retry.count}") + public void onPeriodicJob(PeriodicJob periodicJob) throws Exception { + log.info("onPeriodicJob: {}", periodicJob); + if (failureManager.shouldFail(periodicJob.getId())) { + throw new Exception("Failing PeriodicJob task to be retried" + periodicJob); } + consumedMessageService.save(periodicJob, UUID.randomUUID().toString(), periodicJobQueue); } } diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/common/SpringTestBase.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/common/SpringTestBase.java index b20cc2ee..a36ccb74 100644 --- a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/common/SpringTestBase.java +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/common/SpringTestBase.java @@ -29,6 +29,7 @@ import com.github.sonus21.rqueue.core.support.RqueueMessageUtils; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.test.entity.ConsumedMessage; import com.github.sonus21.rqueue.test.service.ConsumedMessageService; import com.github.sonus21.rqueue.test.service.FailureManager; import com.github.sonus21.rqueue.utils.StringUtils; @@ -101,6 +102,9 @@ public abstract class SpringTestBase extends TestBase { @Value("${list.email.queue.name}") protected String listEmailQueue; + @Value("${periodic.job.queue.name}") + protected String periodicJobQueue; + protected void enqueue(Object message, String queueName) { RqueueMessage rqueueMessage = RqueueMessageUtils.buildMessage( @@ -185,6 +189,13 @@ protected void printQueueStats(String queueName) { printQueueStats(Collections.singletonList(queueName)); } + protected void printConsumedMessage(String queueName) { + for (ConsumedMessage consumedMessage : + consumedMessageService.getConsumedMessagesForQueue(queueName)) { + log.info("Queue {} Msg: {}", queueName, consumedMessage); + } + } + protected void cleanQueue(String queue) { QueueDetail queueDetail = EndpointRegistry.get(queue); stringRqueueRedisTemplate.delete(queueDetail.getQueueName()); diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/PeriodicJob.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/PeriodicJob.java new file mode 100644 index 00000000..d57344d1 --- /dev/null +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/PeriodicJob.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.test.dto; + +import java.util.UUID; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.RandomStringUtils; + +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class PeriodicJob extends BaseQueueMessage { + private String jobName; + + public static PeriodicJob newInstance() { + PeriodicJob job = new PeriodicJob(); + job.setId(UUID.randomUUID().toString()); + job.setJobName(RandomStringUtils.randomAlphabetic(10)); + return job; + } +} diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/entity/ConsumedMessage.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/entity/ConsumedMessage.java index 67568801..21825469 100644 --- a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/entity/ConsumedMessage.java +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/entity/ConsumedMessage.java @@ -18,13 +18,17 @@ import javax.persistence.Column; import javax.persistence.Entity; +import javax.persistence.GeneratedValue; import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.UniqueConstraint; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; +import org.hibernate.annotations.GenericGenerator; @AllArgsConstructor @NoArgsConstructor @@ -33,12 +37,32 @@ @ToString @EqualsAndHashCode @Entity +@Table( + name = "consumed_messages", + uniqueConstraints = {@UniqueConstraint(columnNames = {"message_id", "tag"})}) public class ConsumedMessage { - @Id private String id; + + @Id + @GeneratedValue(generator = "UUID") + @GenericGenerator(name = "UUID", strategy = "org.hibernate.id.UUIDGenerator") + @Column + private String id; + + @Column(name = "message_id") + private String messageId; + + @Column(name = "tag") + private String tag; + + private String queueName; // Around 1 MB of data @Column(length = 1000000) private String message; - @Column private String tag; + @Column private Long createdAt; + + public ConsumedMessage(String messageId, String tag, String queueName, String message) { + this(null, messageId, tag, queueName, message, System.currentTimeMillis()); + } } diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/repository/ConsumedMessageRepository.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/repository/ConsumedMessageRepository.java index 04019f66..b0126ad6 100644 --- a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/repository/ConsumedMessageRepository.java +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/repository/ConsumedMessageRepository.java @@ -17,6 +17,16 @@ package com.github.sonus21.rqueue.test.repository; import com.github.sonus21.rqueue.test.entity.ConsumedMessage; +import java.util.Collection; +import java.util.List; import org.springframework.data.repository.CrudRepository; -public interface ConsumedMessageRepository extends CrudRepository {} +public interface ConsumedMessageRepository extends CrudRepository { + List findByQueueName(String queueName); + + List findByMessageId(String messageId); + + List findByMessageIdIn(Collection messageIds); + + ConsumedMessage findByMessageIdAndTag(String messageId, String tag); +} diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/service/ConsumedMessageService.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/service/ConsumedMessageService.java index 05a85eb3..d8718a5e 100644 --- a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/service/ConsumedMessageService.java +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/service/ConsumedMessageService.java @@ -21,9 +21,11 @@ import com.github.sonus21.rqueue.test.dto.BaseQueueMessage; import com.github.sonus21.rqueue.test.entity.ConsumedMessage; import com.github.sonus21.rqueue.test.repository.ConsumedMessageRepository; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -33,27 +35,29 @@ @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ConsumedMessageService { - @NonNull private ConsumedMessageRepository consumedMessageRepository; - @NonNull private ObjectMapper objectMapper; + @NonNull private final ConsumedMessageRepository consumedMessageRepository; + @NonNull private final ObjectMapper objectMapper; - public ConsumedMessage save(BaseQueueMessage message, String tag) throws JsonProcessingException { + public ConsumedMessage save(BaseQueueMessage message, String tag, String queueName) + throws JsonProcessingException { String textMessage = objectMapper.writeValueAsString(message); - ConsumedMessage consumedMessage = new ConsumedMessage(message.getId(), textMessage, tag); + ConsumedMessage consumedMessage = + new ConsumedMessage(message.getId(), tag, queueName, textMessage); consumedMessageRepository.save(consumedMessage); return consumedMessage; } - public Collection getConsumedMessages(Collection ids) { - return getMessages(ids).values(); + public Collection getConsumedMessages(Collection messageIds) { + return getMessages(messageIds).values(); } - public T getMessage(String id, Class tClass) { - return getMessages(Collections.singletonList(id), tClass).get(id); + public T getMessage(String messageId, Class tClass) { + return getMessages(Collections.singletonList(messageId), tClass).get(messageId); } - public Map getMessages(Collection ids, Class tClass) { + public Map getMessages(Collection messageIds, Class tClass) { Map idToMessage = new HashMap<>(); - getMessages(ids) + getMessages(messageIds) .values() .forEach( consumedMessage -> { @@ -67,15 +71,31 @@ public Map getMessages(Collection ids, Class tClass) { return idToMessage; } - public Map getMessages(Collection ids) { - Iterable consumedMessages = consumedMessageRepository.findAllById(ids); + public Map getMessages(Collection messageIds) { + Iterable consumedMessages = + consumedMessageRepository.findByMessageIdIn(messageIds); Map idToMessage = new HashMap<>(); consumedMessages.forEach( consumedMessage -> idToMessage.put(consumedMessage.getId(), consumedMessage)); return idToMessage; } - public ConsumedMessage getConsumedMessage(String id) { - return consumedMessageRepository.findById(id).orElse(null); + public ConsumedMessage getConsumedMessage(String messageId) { + List messages = getConsumedMessages(messageId); + if (messages.size() == 0) { + return null; + } + if (messages.size() == 1) { + return messages.get(0); + } + throw new IllegalStateException("more than one record found"); + } + + public List getConsumedMessages(String messageId) { + return new ArrayList<>(consumedMessageRepository.findByMessageId(messageId)); + } + + public List getConsumedMessagesForQueue(String queueName) { + return new ArrayList<>(consumedMessageRepository.findByQueueName(queueName)); } } diff --git a/rqueue-common-test/src/main/resources/application.properties b/rqueue-common-test/src/main/resources/application.properties index e3bf246b..81948a43 100644 --- a/rqueue-common-test/src/main/resources/application.properties +++ b/rqueue-common-test/src/main/resources/application.properties @@ -41,5 +41,9 @@ reservation.request.active=false reservation.request.dead.letter.queue.retry.count=1 list.email.queue.name=email-list-queue list.email.queue.enabled=false +periodic.job.queue.name=periodic-job-queue +periodic.job.dead.letter.queue.name=periodic-job-dlq +periodic.job.queue.retry.count=3 +periodic.job.queue.active=false diff --git a/rqueue-common-test/src/main/resources/logback.xml b/rqueue-common-test/src/main/resources/logback.xml index 1b7b7617..c6d4fcb9 100644 --- a/rqueue-common-test/src/main/resources/logback.xml +++ b/rqueue-common-test/src/main/resources/logback.xml @@ -28,7 +28,7 @@ - + diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RedisScriptFactory.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RedisScriptFactory.java index 594ee4c9..0086809e 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RedisScriptFactory.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RedisScriptFactory.java @@ -38,6 +38,7 @@ public static RedisScript getScript(ScriptType type) { case MOVE_MESSAGE_LIST_TO_ZSET: case MOVE_MESSAGE_ZSET_TO_ZSET: case MOVE_MESSAGE_ZSET_TO_LIST: + case SCHEDULE_MESSAGE: script.setResultType(Long.class); return script; case DEQUEUE_MESSAGE: @@ -56,7 +57,8 @@ public enum ScriptType { MOVE_MESSAGE_LIST_TO_LIST("scripts/move_message_list_to_list.lua"), MOVE_MESSAGE_LIST_TO_ZSET("scripts/move_message_list_to_zset.lua"), MOVE_MESSAGE_ZSET_TO_ZSET("scripts/move_message_zset_to_zset.lua"), - MOVE_MESSAGE_ZSET_TO_LIST("scripts/move_message_zset_to_list.lua"); + MOVE_MESSAGE_ZSET_TO_LIST("scripts/move_message_zset_to_list.lua"), + SCHEDULE_MESSAGE("scripts/schedule_message.lua"); private final String path; ScriptType(String path) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessage.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessage.java index 0ea6c5b2..24269499 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessage.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessage.java @@ -32,8 +32,8 @@ @NoArgsConstructor @AllArgsConstructor @JsonPropertyOrder({"failureCount"}) -@Builder -public class RqueueMessage extends SerializableBase implements Cloneable { +@Builder(toBuilder = true) +public class RqueueMessage extends SerializableBase { private static final long serialVersionUID = -3488860960637488519L; // The message id, each message has a unique id @@ -60,12 +60,6 @@ public void updateReEnqueuedAt() { reEnqueuedAt = System.currentTimeMillis(); } - @Override - @SuppressWarnings("squid:S2975") - public RqueueMessage clone() throws CloneNotSupportedException { - return (RqueueMessage) super.clone(); - } - @Override public boolean equals(Object other) { if (other instanceof RqueueMessage) { @@ -77,6 +71,19 @@ public boolean equals(Object other) { return false; } + @JsonIgnore + public String getPseudoId() { + if (!isPeriodicTask()) { + return id; + } + return id + "::sch::" + processAt; + } + + @JsonIgnore + public long nextProcessAt() { + return processAt + period; + } + @JsonIgnore public boolean isPeriodicTask() { return period > 0; diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageEnqueuer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageEnqueuer.java index d588177a..3aa4549f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageEnqueuer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageEnqueuer.java @@ -612,6 +612,33 @@ default boolean enqueueUniqueAtWithPriority( */ String enqueuePeriodic(String queueName, Object message, long periodInMilliSeconds); + /** + * Enqueue a message on given queue that will be running after a given period. It works like + * periodic cron that's scheduled at certain interval, for example every 30 seconds. + * + * @param queueName on which queue message has to be send + * @param message message object it could be any arbitrary object. + * @param period period of this job + * @param unit period unit + * @return message id on successful enqueue otherwise null. + */ + default String enqueuePeriodic(String queueName, Object message, long period, TimeUnit unit) { + return enqueuePeriodic(queueName, message, unit.toMillis(period)); + } + + /** + * Enqueue a message on given queue that will be running after a given period. It works like + * periodic cron that's scheduled at certain interval, for example every 30 seconds. + * + * @param queueName on which queue message has to be send + * @param message message object it could be any arbitrary object. + * @param period job period + * @return message id on successful enqueue otherwise null. + */ + default String enqueuePeriodic(String queueName, Object message, Duration period) { + return enqueuePeriodic(queueName, message, period.toMillis()); + } + /** * Enqueue a message on given queue that will be running after a given period. It works like * periodic cron that's scheduled at certain interval, for example every 30 seconds. @@ -620,8 +647,39 @@ default boolean enqueueUniqueAtWithPriority( * @param messageId message id corresponding to this message * @param message message object it could be any arbitrary object. * @param periodInMilliSeconds period of this job in milliseconds. - * @return message id on successful enqueue otherwise null. + * @return success or failure */ boolean enqueuePeriodic( String queueName, String messageId, Object message, long periodInMilliSeconds); + + /** + * Enqueue a message on given queue that will be running after a given period. It works like + * periodic cron that's scheduled at certain interval, for example every 30 seconds. + * + * @param queueName on which queue message has to be send + * @param messageId message id corresponding to this message + * @param message message object it could be any arbitrary object. + * @param period period of this job . + * @param unit unit of this period + * @return success or failure + */ + default boolean enqueuePeriodic( + String queueName, String messageId, Object message, long period, TimeUnit unit) { + return enqueuePeriodic(queueName, messageId, message, unit.toMillis(period)); + } + + /** + * Enqueue a message on given queue that will be running after a given period. It works like + * periodic cron that's scheduled at certain interval, for example every 30 seconds. + * + * @param queueName on which queue message has to be send + * @param messageId message id corresponding to this message + * @param message message object it could be any arbitrary object. + * @param period period of this job . + * @return success or failure + */ + default boolean enqueuePeriodic( + String queueName, String messageId, Object message, Duration period) { + return enqueuePeriodic(queueName, messageId, message, period.toMillis()); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java index a21fd47d..99c7fda6 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java @@ -64,4 +64,6 @@ MessageMoveResult moveMessageZsetToZset( List> readFromZsetWithScore(String name, long start, long end); Long getScore(String delayedQueueName, RqueueMessage rqueueMessage); + + void scheduleMessage(String queueName, RqueueMessage rqueueMessage, long expiryInMilliSeconds); } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java index 700216fd..733b4b7e 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java @@ -220,6 +220,18 @@ public Long getScore(String delayedQueueName, RqueueMessage rqueueMessage) { return score.longValue(); } + @Override + public void scheduleMessage( + String zsetName, RqueueMessage rqueueMessage, long expiryInMilliSeconds) { + RedisScript script = (RedisScript) getScript(ScriptType.SCHEDULE_MESSAGE); + scriptExecutor.execute( + script, + Arrays.asList(rqueueMessage.getPseudoId(), zsetName), + expiryInMilliSeconds, + rqueueMessage, + rqueueMessage.getProcessAt()); + } + @Override public List readFromList(String name, long start, long end) { List messages = lrange(name, start, end); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java index 5effad7b..8e09f411 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java @@ -133,14 +133,12 @@ void handle( } private void handleOldMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage) { - if (isDebugEnabled()) { - log( - Level.DEBUG, - "Message {} ignored due to new message, Queue: {}", - null, - rqueueMessage, - queueDetail.getName()); - } + log( + Level.DEBUG, + "Message {} ignored due to new message, Queue: {}", + null, + rqueueMessage, + queueDetail.getName()); rqueueMessageTemplate.removeElementFromZset( queueDetail.getProcessingQueueName(), rqueueMessage); } @@ -206,7 +204,9 @@ private void moveMessageToQueue( byte[] processingQueueNameBytes = keySerializer.serialize(queueDetail.getProcessingQueueName()); byte[] queueNameBytes = keySerializer.serialize(queueName); + assert queueNameBytes != null; connection.rPush(queueNameBytes, newMessageBytes); + assert processingQueueNameBytes != null; connection.zRem(processingQueueNameBytes, oldMessageBytes); }); } @@ -242,18 +242,14 @@ private void moveMessageToDlq( Object userMessage, MessageMetadata messageMetadata, int failureCount, - long jobExecutionStartTime) - throws CloneNotSupportedException { - if (isWarningEnabled()) { - log( - Level.WARN, - "Message {} Moved to dead letter queue: {}", - null, - userMessage, - queueDetail.getDeadLetterQueueName()); - } - RqueueMessage newMessage = rqueueMessage.clone(); - newMessage.setFailureCount(failureCount); + long jobExecutionStartTime) { + log( + Level.WARN, + "Message {} Moved to dead letter queue: {}", + null, + userMessage, + queueDetail.getDeadLetterQueueName()); + RqueueMessage newMessage = rqueueMessage.toBuilder().failureCount(failureCount).build(); newMessage.updateReEnqueuedAt(); moveMessageForReprocessingOrDlq(queueDetail, rqueueMessage, newMessage, userMessage); publishEvent( @@ -271,13 +267,9 @@ private void parkMessageForRetry( MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime, - long delay) - throws CloneNotSupportedException { - if (isDebugEnabled()) { - log(Level.DEBUG, "Message {} will be retried in {}Ms", null, userMessage, delay); - } - RqueueMessage newMessage = rqueueMessage.clone(); - newMessage.setFailureCount(failureCount); + long delay) { + log(Level.DEBUG, "Message {} will be retried in {}Ms", null, userMessage, delay); + RqueueMessage newMessage = rqueueMessage.toBuilder().failureCount(failureCount).build(); newMessage.updateReEnqueuedAt(); rqueueMessageTemplate.moveMessage( queueDetail.getProcessingQueueName(), @@ -295,9 +287,7 @@ private void discardMessage( MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) { - if (isDebugEnabled()) { - log(Level.DEBUG, "Message {} discarded due to retry limit exhaust", null, userMessage); - } + log(Level.DEBUG, "Message {} discarded due to retry limit exhaust", null, userMessage); deleteMessage( queueDetail, rqueueMessage, @@ -315,9 +305,7 @@ private void handleManualDeletion( MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) { - if (isDebugEnabled()) { - log(Level.DEBUG, "Message Deleted {} successfully", null, rqueueMessage); - } + log(Level.DEBUG, "Message Deleted {} successfully", null, rqueueMessage); deleteMessage( queueDetail, rqueueMessage, @@ -335,9 +323,7 @@ private void handleSuccessFullExecution( MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) { - if (isDebugEnabled()) { - log(Level.DEBUG, "Message consumed {} successfully", null, rqueueMessage); - } + log(Level.DEBUG, "Message consumed {} successfully", null, rqueueMessage); deleteMessage( queueDetail, rqueueMessage, @@ -354,8 +340,7 @@ private void handleRetryExceededMessage( Object userMessage, MessageMetadata messageMetadata, int failureCount, - long jobExecutionStartTime) - throws CloneNotSupportedException { + long jobExecutionStartTime) { if (queueDetail.isDlqSet()) { moveMessageToDlq( queueDetail, @@ -387,8 +372,7 @@ private void handleFailure( Object userMessage, MessageMetadata messageMetadata, int failureCount, - long jobExecutionStartTime) - throws CloneNotSupportedException { + long jobExecutionStartTime) { int maxRetryCount = getMaxRetryCount(rqueueMessage, queueDetail); if (failureCount < maxRetryCount) { long delay = taskExecutionBackoff.nextBackOff(userMessage, rqueueMessage, failureCount); @@ -428,9 +412,7 @@ private void handleIgnoredMessage( MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) { - if (isDebugEnabled()) { - log(Level.DEBUG, "Message {} ignored, Queue: {}", null, rqueueMessage, queueDetail.getName()); - } + log(Level.DEBUG, "Message {} ignored, Queue: {}", null, rqueueMessage, queueDetail.getName()); deleteMessage( queueDetail, rqueueMessage, diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java index 4f44d891..c5ab6161 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java @@ -17,6 +17,7 @@ package com.github.sonus21.rqueue.listener; import static com.github.sonus21.rqueue.utils.Constants.DELTA_BETWEEN_RE_ENQUEUE_TIME; +import static com.github.sonus21.rqueue.utils.Constants.SECONDS_IN_A_DAY; import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.core.RqueueMessage; @@ -185,8 +186,7 @@ private void logExecutionTimeWarning( } } - @Override - void start() { + private void processSimpleMessage() { int failureCount = rqueueMessage.getFailureCount(); long maxProcessingTime = getMaxProcessingTime(); long startTime = System.currentTimeMillis(); @@ -229,4 +229,21 @@ void start() { semaphore.release(); } } + + private void processScheduledMessage() { + RqueueMessage newMessage = + rqueueMessage.toBuilder().processAt(rqueueMessage.nextProcessAt()).build(); + getRqueueMessageTemplate() + .scheduleMessage(queueDetail.getDelayedQueueName(), newMessage, SECONDS_IN_A_DAY); + processSimpleMessage(); + } + + @Override + void start() { + if (rqueueMessage.isPeriodicTask()) { + processScheduledMessage(); + } else { + processSimpleMessage(); + } + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/ValueResolver.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/ValueResolver.java index b7dd5b14..fd2bd7dc 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/ValueResolver.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/ValueResolver.java @@ -62,7 +62,9 @@ public static Integer parseStringToInt(String val) { public static boolean convertToBoolean(String s) { String tmpString = clean(s); - if (tmpString.equalsIgnoreCase("true")) { + if (tmpString.equalsIgnoreCase("true") + || tmpString.equals("1") + || tmpString.equalsIgnoreCase("yes")) { return true; } if (tmpString.equalsIgnoreCase("false")) { diff --git a/rqueue-core/src/main/resources/scripts/schedule_message.lua b/rqueue-core/src/main/resources/scripts/schedule_message.lua new file mode 100644 index 00000000..a22b0fe6 --- /dev/null +++ b/rqueue-core/src/main/resources/scripts/schedule_message.lua @@ -0,0 +1,11 @@ +-- get current value +local value = redis.call('GET', KEYS[1]) + +if value == nil then + redis.call('SET', KEYS[1], "1", "EX", ARGV[1]) + redis.call('ZADD', KEYS[2], ARGV[3], ARGV[2]) + return 1 +end + +return 0 + diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageSchedulingTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageSchedulingTest.java new file mode 100644 index 00000000..aa4b7036 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageSchedulingTest.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +public class MessageSchedulingTest { + +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/PeriodicMessageTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/PeriodicMessageTest.java new file mode 100644 index 00000000..e02d075e --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/PeriodicMessageTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +import com.github.sonus21.junit.SpringTestTracerExtension; +import com.github.sonus21.rqueue.exception.TimedOutException; +import com.github.sonus21.rqueue.spring.boot.application.ApplicationWithCustomConfiguration; +import com.github.sonus21.rqueue.test.common.SpringTestBase; +import com.github.sonus21.rqueue.test.dto.PeriodicJob; +import com.github.sonus21.rqueue.utils.TimeoutUtils; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; + +@SpringBootTest +@ExtendWith(SpringTestTracerExtension.class) +@ContextConfiguration(classes = ApplicationWithCustomConfiguration.class) +@Slf4j +@TestPropertySource( + properties = { + "spring.redis.port=8013", + "mysql.db.name=PeriodicMessageTest", + "rqueue.metrics.count.failure=false", + "rqueue.metrics.count.execution=false", + "periodic.job.queue.active=true" + }) +public class PeriodicMessageTest extends SpringTestBase { + @Test + public void testSimplePeriodicMessage() throws TimedOutException { + PeriodicJob job = PeriodicJob.newInstance(); + String messageId = + rqueueMessageEnqueuer.enqueuePeriodic(periodicJobQueue, job, Duration.ofSeconds(2)); + TimeoutUtils.waitFor( + () -> { + printQueueStats(periodicJobQueue); + printConsumedMessage(periodicJobQueue); + return consumedMessageService.getConsumedMessages(job.getId()).size() > 1; + }, + "at least two execution"); + rqueueMessageManager.deleteMessage(periodicJobQueue, messageId); + } + + @Test + public void testPeriodicMessageWithTimeUnit() throws TimedOutException { + PeriodicJob job = PeriodicJob.newInstance(); + String messageId = + rqueueMessageEnqueuer.enqueuePeriodic(periodicJobQueue, job, 2000, TimeUnit.MILLISECONDS); + TimeoutUtils.waitFor( + () -> consumedMessageService.getConsumedMessages(job.getId()).size() > 1, + "at least two execution"); + rqueueMessageManager.deleteMessage(periodicJobQueue, messageId); + } + + @Test + public void testPeriodicMessageMilliseconds() throws TimedOutException { + PeriodicJob job = PeriodicJob.newInstance(); + String messageId = rqueueMessageEnqueuer.enqueuePeriodic(periodicJobQueue, job, 2000); + TimeoutUtils.waitFor( + () -> consumedMessageService.getConsumedMessages(job.getId()).size() > 1, + "at least two execution"); + rqueueMessageManager.deleteMessage(periodicJobQueue, messageId); + } +} diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/services/ConsumedMessageRepositoryImpl.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/services/ConsumedMessageRepositoryImpl.java index 3c2933c0..268c37aa 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/services/ConsumedMessageRepositoryImpl.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/services/ConsumedMessageRepositoryImpl.java @@ -18,6 +18,8 @@ import com.github.sonus21.rqueue.test.entity.ConsumedMessage; import com.github.sonus21.rqueue.test.repository.ConsumedMessageRepository; +import java.util.Collection; +import java.util.List; import java.util.Optional; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; @@ -82,4 +84,24 @@ public void deleteAll(Iterable entities) {} @Override public void deleteAll() {} + + @Override + public List findByQueueName(String queueName) { + return null; + } + + @Override + public List findByMessageId(String messageId) { + return null; + } + + @Override + public List findByMessageIdIn(Collection messageIds) { + return null; + } + + @Override + public ConsumedMessage findByMessageIdAndTag(String messageId, String tag) { + return null; + } }