Skip to content

Commit

Permalink
periodic job #51
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Dec 3, 2020
1 parent b66d2cf commit 09221a0
Show file tree
Hide file tree
Showing 21 changed files with 465 additions and 85 deletions.
9 changes: 8 additions & 1 deletion README.md
Expand Up @@ -18,7 +18,7 @@
* **Message delivery**: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
* **Redis cluster** : Redis cluster can be used with driver.
* **Metrics** : In flight messages, waiting for consumption and delayed messages
* **Web interface**: a web interface to manage a queue and queue insights including latency
* **Web Dashboard**: a web dashboard to manage a queue and queue insights including latency
* **Automatic message serialization and deserialization**
* **Concurrency**: Concurrency of any queue can be configured
* **Queue Priority** :
Expand All @@ -28,6 +28,7 @@
* **Callbacks** : Callbacks for dead letter queue, discard etc
* **Events** 1. Bootstrap event 2. Task execution event.
* **Unique message** : Unique message processing for a queue based on the message id
* **Periodic message** : Process same message at certain interval
* **Redis connection**: A different redis setup can be used for Rqueue

## Getting Started
Expand Down Expand Up @@ -117,6 +118,12 @@ public class MessageService {
public void sendSms(Sms sms, SmsPriority priority){
rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms);
}

// enqueue periodic job, email should be sent every 30 seconds
public void sendPeriodicEmail(Email email){
rqueueMessageEnqueuer.enqueuePeriodic("email-queue", invoice, 30_000);
}

}
```

Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.github.sonus21.rqueue.test.dto.FeedGeneration;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.dto.PeriodicJob;
import com.github.sonus21.rqueue.test.dto.Reservation;
import com.github.sonus21.rqueue.test.dto.ReservationRequest;
import com.github.sonus21.rqueue.test.dto.Sms;
Expand All @@ -36,6 +37,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
Expand All @@ -49,13 +51,46 @@ public class MessageListener {
@NonNull private ConsumedMessageService consumedMessageService;
@NonNull private FailureManager failureManager;

@Value("${job.queue.name}")
private String jobQueue;

@Value("${notification.queue.name}")
private String notificationQueueName;

@Value("${email.queue.name}")
private String emailQueue;

@Value("${sms.queue}")
private String smsQueue;

@Value("${chat.indexing.queue}")
private String chatIndexingQueue;

@Value("${feed.generation.queue}")
private String feedGenerationQueue;

@Value("${reservation.queue}")
private String reservationQueue;

@Value("${reservation.request.queue.name}")
private String reservationRequestQueue;

@Value("${reservation.request.dead.letter.queue.name}")
private String reservationRequestDeadLetterQueue;

@Value("${list.email.queue.name}")
private String listEmailQueue;

@Value("${periodic.job.queue.name}")
private String periodicJobQueue;

@RqueueListener(value = "${job.queue.name}", active = "${job.queue.active}")
public void onMessage(Job job) throws Exception {
log.info("Job: {}", job);
if (failureManager.shouldFail(job.getId())) {
throw new Exception("Failing job task to be retried" + job);
}
consumedMessageService.save(job, null);
consumedMessageService.save(job, null, jobQueue);
}

@RqueueListener(
Expand All @@ -69,7 +104,7 @@ public void onMessage(
if (failureManager.shouldFail(notification.getId())) {
throw new Exception("Failing notification task to be retried" + notification);
}
consumedMessageService.save(notification, null);
consumedMessageService.save(notification, null, notificationQueueName);
}

@RqueueListener(
Expand All @@ -84,7 +119,7 @@ public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueM
if (failureManager.shouldFail(email.getId())) {
throw new Exception("Failing email task to be retried" + email);
}
consumedMessageService.save(email, null);
consumedMessageService.save(email, null, emailQueue);
}

@RqueueListener(
Expand All @@ -98,7 +133,7 @@ public void onMessage(Sms sms) throws Exception {
if (failureManager.shouldFail(sms.getId())) {
throw new Exception("Failing sms task to be retried" + sms);
}
consumedMessageService.save(sms, null);
consumedMessageService.save(sms, null, smsQueue);
}

@RqueueListener(
Expand All @@ -112,7 +147,7 @@ public void onMessage(ChatIndexing chatIndexing) throws Exception {
if (failureManager.shouldFail(chatIndexing.getId())) {
throw new Exception("Failing chat indexing task to be retried" + chatIndexing);
}
consumedMessageService.save(chatIndexing, null);
consumedMessageService.save(chatIndexing, null, chatIndexingQueue);
}

@RqueueListener(
Expand All @@ -126,7 +161,7 @@ public void onMessage(FeedGeneration feedGeneration) throws Exception {
if (failureManager.shouldFail(feedGeneration.getId())) {
throw new Exception("Failing feedGeneration task to be retried" + feedGeneration);
}
consumedMessageService.save(feedGeneration, null);
consumedMessageService.save(feedGeneration, null, feedGenerationQueue);
}

@RqueueListener(
Expand All @@ -140,7 +175,7 @@ public void onMessage(Reservation reservation) throws Exception {
if (failureManager.shouldFail(reservation.getId())) {
throw new Exception("Failing reservation task to be retried" + reservation);
}
consumedMessageService.save(reservation, null);
consumedMessageService.save(reservation, null, reservationQueue);
}

@RqueueListener(
Expand All @@ -154,7 +189,7 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
if (failureManager.shouldFail(request.getId())) {
throw new Exception("Failing reservation request task to be retried" + request);
}
consumedMessageService.save(request, null);
consumedMessageService.save(request, null, reservationRequestQueue);
}

@RqueueListener(
Expand All @@ -164,15 +199,29 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
public void onMessageReservationRequestDeadLetterQueue(ReservationRequest request)
throws Exception {
log.info("ReservationRequest Dead Letter Queue{}", request);
consumedMessageService.save(request, "reservation-request-dlq");
consumedMessageService.save(
request, "reservation-request-dlq", reservationRequestDeadLetterQueue);
}

@RqueueListener(value = "${list.email.queue.name}", active = "${list.email.queue.enabled}")
public void onMessageEmailList(List<Email> emailList) throws JsonProcessingException {
log.info("onMessageEmailList {}", emailList);
String consumedId = UUID.randomUUID().toString();
for (Email email : emailList) {
consumedMessageService.save(email, consumedId);
consumedMessageService.save(email, consumedId, listEmailQueue);
}
}

@RqueueListener(
value = "${periodic.job.queue.name}",
active = "${periodic.job.queue.active}",
deadLetterQueue = "${periodic.job.dead.letter.queue.name}",
numRetries = "${periodic.job.queue.retry.count}")
public void onPeriodicJob(PeriodicJob periodicJob) throws Exception {
log.info("onPeriodicJob: {}", periodicJob);
if (failureManager.shouldFail(periodicJob.getId())) {
throw new Exception("Failing PeriodicJob task to be retried" + periodicJob);
}
consumedMessageService.save(periodicJob, UUID.randomUUID().toString(), periodicJobQueue);
}
}
Expand Up @@ -29,6 +29,7 @@
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.test.entity.ConsumedMessage;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
import com.github.sonus21.rqueue.utils.StringUtils;
Expand Down Expand Up @@ -101,6 +102,9 @@ public abstract class SpringTestBase extends TestBase {
@Value("${list.email.queue.name}")
protected String listEmailQueue;

@Value("${periodic.job.queue.name}")
protected String periodicJobQueue;

protected void enqueue(Object message, String queueName) {
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
Expand Down Expand Up @@ -185,6 +189,13 @@ protected void printQueueStats(String queueName) {
printQueueStats(Collections.singletonList(queueName));
}

protected void printConsumedMessage(String queueName) {
for (ConsumedMessage consumedMessage :
consumedMessageService.getConsumedMessagesForQueue(queueName)) {
log.info("Queue {} Msg: {}", queueName, consumedMessage);
}
}

protected void cleanQueue(String queue) {
QueueDetail queueDetail = EndpointRegistry.get(queue);
stringRqueueRedisTemplate.delete(queueDetail.getQueueName());
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;

@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class PeriodicJob extends BaseQueueMessage {
private String jobName;

public static PeriodicJob newInstance() {
PeriodicJob job = new PeriodicJob();
job.setId(UUID.randomUUID().toString());
job.setJobName(RandomStringUtils.randomAlphabetic(10));
return job;
}
}
Expand Up @@ -18,13 +18,17 @@

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.hibernate.annotations.GenericGenerator;

@AllArgsConstructor
@NoArgsConstructor
Expand All @@ -33,12 +37,32 @@
@ToString
@EqualsAndHashCode
@Entity
@Table(
name = "consumed_messages",
uniqueConstraints = {@UniqueConstraint(columnNames = {"message_id", "tag"})})
public class ConsumedMessage {
@Id private String id;

@Id
@GeneratedValue(generator = "UUID")
@GenericGenerator(name = "UUID", strategy = "org.hibernate.id.UUIDGenerator")
@Column
private String id;

@Column(name = "message_id")
private String messageId;

@Column(name = "tag")
private String tag;

private String queueName;

// Around 1 MB of data
@Column(length = 1000000)
private String message;

@Column private String tag;
@Column private Long createdAt;

public ConsumedMessage(String messageId, String tag, String queueName, String message) {
this(null, messageId, tag, queueName, message, System.currentTimeMillis());
}
}
Expand Up @@ -17,6 +17,16 @@
package com.github.sonus21.rqueue.test.repository;

import com.github.sonus21.rqueue.test.entity.ConsumedMessage;
import java.util.Collection;
import java.util.List;
import org.springframework.data.repository.CrudRepository;

public interface ConsumedMessageRepository extends CrudRepository<ConsumedMessage, String> {}
public interface ConsumedMessageRepository extends CrudRepository<ConsumedMessage, String> {
List<ConsumedMessage> findByQueueName(String queueName);

List<ConsumedMessage> findByMessageId(String messageId);

List<ConsumedMessage> findByMessageIdIn(Collection<String> messageIds);

ConsumedMessage findByMessageIdAndTag(String messageId, String tag);
}

0 comments on commit 09221a0

Please sign in to comment.