Skip to content

Commit

Permalink
Merge 65289b7 into dc1fb3b
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Dec 5, 2020
2 parents dc1fb3b + 65289b7 commit caf1417
Show file tree
Hide file tree
Showing 39 changed files with 1,452 additions and 257 deletions.
13 changes: 10 additions & 3 deletions README.md
Expand Up @@ -3,11 +3,11 @@
<h1 style="float:left">Rqueue: Redis Queue,Task Queue, Delayed Queue for Spring and Spring Boot</h1>
</div>

[![Build Status](https://travis-ci.org/sonus21/rqueue.svg?branch=master)](https://travis-ci.org/sonus21/rqueue)
[![Build Status](https://circleci.com/gh/sonus21/rqueue/tree/master.svg?style=shield)](https://circleci.com/gh/sonus21/rqueue/tree/master)
[![Coverage Status](https://coveralls.io/repos/github/sonus21/rqueue/badge.svg?branch=master)](https://coveralls.io/github/sonus21/rqueue?branch=master)
[![Maven Central](https://img.shields.io/maven-central/v/com.github.sonus21/rqueue-core)](https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[![Javadoc](https://javadoc.io/badge2/com.github.sonus21/rqueue-core/javadoc.svg)](https://javadoc.io/doc/com.github.sonus21/rqueue-core)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)

**Rqueue** is an asynchronous task executor(worker) built for spring framework based on the spring framework's messaging library backed by Redis. It can be used as message broker as well, where all services code is in Spring.

Expand All @@ -18,7 +18,7 @@
* **Message delivery**: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
* **Redis cluster** : Redis cluster can be used with driver.
* **Metrics** : In flight messages, waiting for consumption and delayed messages
* **Web interface**: a web interface to manage a queue and queue insights including latency
* **Web Dashboard**: a web dashboard to manage a queue and queue insights including latency
* **Automatic message serialization and deserialization**
* **Concurrency**: Concurrency of any queue can be configured
* **Queue Priority** :
Expand All @@ -28,6 +28,7 @@
* **Callbacks** : Callbacks for dead letter queue, discard etc
* **Events** 1. Bootstrap event 2. Task execution event.
* **Unique message** : Unique message processing for a queue based on the message id
* **Periodic message** : Process same message at certain interval
* **Redis connection**: A different redis setup can be used for Rqueue

## Getting Started
Expand Down Expand Up @@ -117,6 +118,12 @@ public class MessageService {
public void sendSms(Sms sms, SmsPriority priority){
rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms);
}

// enqueue periodic job, email should be sent every 30 seconds
public void sendPeriodicEmail(Email email){
rqueueMessageEnqueuer.enqueuePeriodic("email-queue", invoice, 30_000);
}

}
```

Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.github.sonus21.rqueue.test.dto.FeedGeneration;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.dto.PeriodicJob;
import com.github.sonus21.rqueue.test.dto.Reservation;
import com.github.sonus21.rqueue.test.dto.ReservationRequest;
import com.github.sonus21.rqueue.test.dto.Sms;
Expand All @@ -36,6 +37,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
Expand All @@ -49,13 +51,46 @@ public class MessageListener {
@NonNull private ConsumedMessageService consumedMessageService;
@NonNull private FailureManager failureManager;

@Value("${job.queue.name}")
private String jobQueue;

@Value("${notification.queue.name}")
private String notificationQueueName;

@Value("${email.queue.name}")
private String emailQueue;

@Value("${sms.queue}")
private String smsQueue;

@Value("${chat.indexing.queue}")
private String chatIndexingQueue;

@Value("${feed.generation.queue}")
private String feedGenerationQueue;

@Value("${reservation.queue}")
private String reservationQueue;

@Value("${reservation.request.queue.name}")
private String reservationRequestQueue;

@Value("${reservation.request.dead.letter.queue.name}")
private String reservationRequestDeadLetterQueue;

@Value("${list.email.queue.name}")
private String listEmailQueue;

@Value("${periodic.job.queue.name}")
private String periodicJobQueue;

@RqueueListener(value = "${job.queue.name}", active = "${job.queue.active}")
public void onMessage(Job job) throws Exception {
log.info("Job: {}", job);
if (failureManager.shouldFail(job.getId())) {
throw new Exception("Failing job task to be retried" + job);
}
consumedMessageService.save(job, null);
consumedMessageService.save(job, null, jobQueue);
}

@RqueueListener(
Expand All @@ -69,7 +104,7 @@ public void onMessage(
if (failureManager.shouldFail(notification.getId())) {
throw new Exception("Failing notification task to be retried" + notification);
}
consumedMessageService.save(notification, null);
consumedMessageService.save(notification, null, notificationQueueName);
}

@RqueueListener(
Expand All @@ -84,7 +119,7 @@ public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueM
if (failureManager.shouldFail(email.getId())) {
throw new Exception("Failing email task to be retried" + email);
}
consumedMessageService.save(email, null);
consumedMessageService.save(email, null, emailQueue);
}

@RqueueListener(
Expand All @@ -98,7 +133,7 @@ public void onMessage(Sms sms) throws Exception {
if (failureManager.shouldFail(sms.getId())) {
throw new Exception("Failing sms task to be retried" + sms);
}
consumedMessageService.save(sms, null);
consumedMessageService.save(sms, null, smsQueue);
}

@RqueueListener(
Expand All @@ -112,7 +147,7 @@ public void onMessage(ChatIndexing chatIndexing) throws Exception {
if (failureManager.shouldFail(chatIndexing.getId())) {
throw new Exception("Failing chat indexing task to be retried" + chatIndexing);
}
consumedMessageService.save(chatIndexing, null);
consumedMessageService.save(chatIndexing, null, chatIndexingQueue);
}

@RqueueListener(
Expand All @@ -126,7 +161,7 @@ public void onMessage(FeedGeneration feedGeneration) throws Exception {
if (failureManager.shouldFail(feedGeneration.getId())) {
throw new Exception("Failing feedGeneration task to be retried" + feedGeneration);
}
consumedMessageService.save(feedGeneration, null);
consumedMessageService.save(feedGeneration, null, feedGenerationQueue);
}

@RqueueListener(
Expand All @@ -140,7 +175,7 @@ public void onMessage(Reservation reservation) throws Exception {
if (failureManager.shouldFail(reservation.getId())) {
throw new Exception("Failing reservation task to be retried" + reservation);
}
consumedMessageService.save(reservation, null);
consumedMessageService.save(reservation, null, reservationQueue);
}

@RqueueListener(
Expand All @@ -154,7 +189,7 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
if (failureManager.shouldFail(request.getId())) {
throw new Exception("Failing reservation request task to be retried" + request);
}
consumedMessageService.save(request, null);
consumedMessageService.save(request, null, reservationRequestQueue);
}

@RqueueListener(
Expand All @@ -164,15 +199,29 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
public void onMessageReservationRequestDeadLetterQueue(ReservationRequest request)
throws Exception {
log.info("ReservationRequest Dead Letter Queue{}", request);
consumedMessageService.save(request, "reservation-request-dlq");
consumedMessageService.save(
request, "reservation-request-dlq", reservationRequestDeadLetterQueue);
}

@RqueueListener(value = "${list.email.queue.name}", active = "${list.email.queue.enabled}")
public void onMessageEmailList(List<Email> emailList) throws JsonProcessingException {
log.info("onMessageEmailList {}", emailList);
String consumedId = UUID.randomUUID().toString();
for (Email email : emailList) {
consumedMessageService.save(email, consumedId);
consumedMessageService.save(email, consumedId, listEmailQueue);
}
}

@RqueueListener(
value = "${periodic.job.queue.name}",
active = "${periodic.job.queue.active}",
deadLetterQueue = "${periodic.job.dead.letter.queue.name}",
numRetries = "${periodic.job.queue.retry.count}")
public void onPeriodicJob(PeriodicJob periodicJob) throws Exception {
log.info("onPeriodicJob: {}", periodicJob);
if (failureManager.shouldFail(periodicJob.getId())) {
throw new Exception("Failing PeriodicJob task to be retried" + periodicJob);
}
consumedMessageService.save(periodicJob, UUID.randomUUID().toString(), periodicJobQueue);
}
}
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Executors;
import javax.sql.DataSource;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -37,6 +38,7 @@
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import redis.embedded.RedisServer;

@Slf4j
public abstract class ApplicationBasicConfiguration {
private static final Logger monitorLogger = LoggerFactory.getLogger("monitor");
protected RedisServer redisServer;
Expand All @@ -58,11 +60,20 @@ public abstract class ApplicationBasicConfiguration {
@Value("${monitor.thread.count:0}")
protected int monitorThreads;

@Value("${monitor.enabled:false}")
protected boolean monitoringEnabled;

protected void init() {
if (monitoringEnabled && monitorThreads == 0) {
monitorThreads = 1;
}
if (monitorThreads > 0) {
executorService = Executors.newFixedThreadPool(monitorThreads);
processes = new ArrayList<>();
}
if (monitoringEnabled) {
monitor(redisHost, redisPort);
}
if (useSystemRedis) {
return;
}
Expand Down Expand Up @@ -92,6 +103,7 @@ protected void destroy() {
}

protected void monitor(String host, int port) {
log.info("Monitor {}:{}", host, port);
executorService.submit(
() -> {
try {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.test.entity.ConsumedMessage;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
import com.github.sonus21.rqueue.utils.StringUtils;
Expand Down Expand Up @@ -101,10 +102,13 @@ public abstract class SpringTestBase extends TestBase {
@Value("${list.email.queue.name}")
protected String listEmailQueue;

@Value("${periodic.job.queue.name}")
protected String periodicJobQueue;

protected void enqueue(Object message, String queueName) {
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), message, queueName, null, null, null);
rqueueMessageManager.getMessageConverter(), queueName, message, null, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
}

Expand All @@ -113,7 +117,7 @@ protected void enqueue(String queueName, Factory factory, int n) {
Object object = factory.next(i);
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), object, queueName, null, null, null);
rqueueMessageManager.getMessageConverter(), queueName, object, null, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
}
}
Expand All @@ -124,15 +128,15 @@ protected void enqueueIn(String zsetName, Factory factory, Delay delay, int n) {
long score = delay.getDelay(i);
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), object, zsetName, null, score, null);
rqueueMessageManager.getMessageConverter(), zsetName, object, null, score, null);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
}
}

protected void enqueueIn(Object message, String zsetName, long delay) {
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), message, zsetName, null, delay, null);
rqueueMessageManager.getMessageConverter(), zsetName, message, null, delay, null);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
}

Expand Down Expand Up @@ -185,6 +189,13 @@ protected void printQueueStats(String queueName) {
printQueueStats(Collections.singletonList(queueName));
}

protected void printConsumedMessage(String queueName) {
for (ConsumedMessage consumedMessage :
consumedMessageService.getConsumedMessagesForQueue(queueName)) {
log.info("Queue {} Msg: {}", queueName, consumedMessage);
}
}

protected void cleanQueue(String queue) {
QueueDetail queueDetail = EndpointRegistry.get(queue);
stringRqueueRedisTemplate.delete(queueDetail.getQueueName());
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;

@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class PeriodicJob extends BaseQueueMessage {
private String jobName;

public static PeriodicJob newInstance() {
PeriodicJob job = new PeriodicJob();
job.setId(UUID.randomUUID().toString());
job.setJobName(RandomStringUtils.randomAlphabetic(10));
return job;
}
}

0 comments on commit caf1417

Please sign in to comment.