Skip to content

Commit

Permalink
Merge a75703c into dc1fb3b
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Dec 5, 2020
2 parents dc1fb3b + a75703c commit 427ba9a
Show file tree
Hide file tree
Showing 37 changed files with 1,416 additions and 255 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
<h1 style="float:left">Rqueue: Redis Queue,Task Queue, Delayed Queue for Spring and Spring Boot</h1>
</div>

[![Build Status](https://travis-ci.org/sonus21/rqueue.svg?branch=master)](https://travis-ci.org/sonus21/rqueue)
[![Build Status](https://circleci.com/gh/sonus21/rqueue/tree/master.svg?style=shield)](https://circleci.com/gh/sonus21/rqueue/tree/master)
[![Coverage Status](https://coveralls.io/repos/github/sonus21/rqueue/badge.svg?branch=master)](https://coveralls.io/github/sonus21/rqueue?branch=master)
[![Maven Central](https://img.shields.io/maven-central/v/com.github.sonus21/rqueue-core)](https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[![Javadoc](https://javadoc.io/badge2/com.github.sonus21/rqueue-core/javadoc.svg)](https://javadoc.io/doc/com.github.sonus21/rqueue-core)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)

**Rqueue** is an asynchronous task executor(worker) built for spring framework based on the spring framework's messaging library backed by Redis. It can be used as message broker as well, where all services code is in Spring.

Expand All @@ -18,7 +18,7 @@
* **Message delivery**: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
* **Redis cluster** : Redis cluster can be used with driver.
* **Metrics** : In flight messages, waiting for consumption and delayed messages
* **Web interface**: a web interface to manage a queue and queue insights including latency
* **Web Dashboard**: a web dashboard to manage a queue and queue insights including latency
* **Automatic message serialization and deserialization**
* **Concurrency**: Concurrency of any queue can be configured
* **Queue Priority** :
Expand All @@ -28,6 +28,7 @@
* **Callbacks** : Callbacks for dead letter queue, discard etc
* **Events** 1. Bootstrap event 2. Task execution event.
* **Unique message** : Unique message processing for a queue based on the message id
* **Periodic message** : Process same message at certain interval
* **Redis connection**: A different redis setup can be used for Rqueue

## Getting Started
Expand Down Expand Up @@ -117,6 +118,12 @@ public class MessageService {
public void sendSms(Sms sms, SmsPriority priority){
rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms);
}

// enqueue periodic job, email should be sent every 30 seconds
public void sendPeriodicEmail(Email email){
rqueueMessageEnqueuer.enqueuePeriodic("email-queue", invoice, 30_000);
}

}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.github.sonus21.rqueue.test.dto.FeedGeneration;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.dto.PeriodicJob;
import com.github.sonus21.rqueue.test.dto.Reservation;
import com.github.sonus21.rqueue.test.dto.ReservationRequest;
import com.github.sonus21.rqueue.test.dto.Sms;
Expand All @@ -36,6 +37,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
Expand All @@ -49,13 +51,46 @@ public class MessageListener {
@NonNull private ConsumedMessageService consumedMessageService;
@NonNull private FailureManager failureManager;

@Value("${job.queue.name}")
private String jobQueue;

@Value("${notification.queue.name}")
private String notificationQueueName;

@Value("${email.queue.name}")
private String emailQueue;

@Value("${sms.queue}")
private String smsQueue;

@Value("${chat.indexing.queue}")
private String chatIndexingQueue;

@Value("${feed.generation.queue}")
private String feedGenerationQueue;

@Value("${reservation.queue}")
private String reservationQueue;

@Value("${reservation.request.queue.name}")
private String reservationRequestQueue;

@Value("${reservation.request.dead.letter.queue.name}")
private String reservationRequestDeadLetterQueue;

@Value("${list.email.queue.name}")
private String listEmailQueue;

@Value("${periodic.job.queue.name}")
private String periodicJobQueue;

@RqueueListener(value = "${job.queue.name}", active = "${job.queue.active}")
public void onMessage(Job job) throws Exception {
log.info("Job: {}", job);
if (failureManager.shouldFail(job.getId())) {
throw new Exception("Failing job task to be retried" + job);
}
consumedMessageService.save(job, null);
consumedMessageService.save(job, null, jobQueue);
}

@RqueueListener(
Expand All @@ -69,7 +104,7 @@ public void onMessage(
if (failureManager.shouldFail(notification.getId())) {
throw new Exception("Failing notification task to be retried" + notification);
}
consumedMessageService.save(notification, null);
consumedMessageService.save(notification, null, notificationQueueName);
}

@RqueueListener(
Expand All @@ -84,7 +119,7 @@ public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueM
if (failureManager.shouldFail(email.getId())) {
throw new Exception("Failing email task to be retried" + email);
}
consumedMessageService.save(email, null);
consumedMessageService.save(email, null, emailQueue);
}

@RqueueListener(
Expand All @@ -98,7 +133,7 @@ public void onMessage(Sms sms) throws Exception {
if (failureManager.shouldFail(sms.getId())) {
throw new Exception("Failing sms task to be retried" + sms);
}
consumedMessageService.save(sms, null);
consumedMessageService.save(sms, null, smsQueue);
}

@RqueueListener(
Expand All @@ -112,7 +147,7 @@ public void onMessage(ChatIndexing chatIndexing) throws Exception {
if (failureManager.shouldFail(chatIndexing.getId())) {
throw new Exception("Failing chat indexing task to be retried" + chatIndexing);
}
consumedMessageService.save(chatIndexing, null);
consumedMessageService.save(chatIndexing, null, chatIndexingQueue);
}

@RqueueListener(
Expand All @@ -126,7 +161,7 @@ public void onMessage(FeedGeneration feedGeneration) throws Exception {
if (failureManager.shouldFail(feedGeneration.getId())) {
throw new Exception("Failing feedGeneration task to be retried" + feedGeneration);
}
consumedMessageService.save(feedGeneration, null);
consumedMessageService.save(feedGeneration, null, feedGenerationQueue);
}

@RqueueListener(
Expand All @@ -140,7 +175,7 @@ public void onMessage(Reservation reservation) throws Exception {
if (failureManager.shouldFail(reservation.getId())) {
throw new Exception("Failing reservation task to be retried" + reservation);
}
consumedMessageService.save(reservation, null);
consumedMessageService.save(reservation, null, reservationQueue);
}

@RqueueListener(
Expand All @@ -154,7 +189,7 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
if (failureManager.shouldFail(request.getId())) {
throw new Exception("Failing reservation request task to be retried" + request);
}
consumedMessageService.save(request, null);
consumedMessageService.save(request, null, reservationRequestQueue);
}

@RqueueListener(
Expand All @@ -164,15 +199,29 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
public void onMessageReservationRequestDeadLetterQueue(ReservationRequest request)
throws Exception {
log.info("ReservationRequest Dead Letter Queue{}", request);
consumedMessageService.save(request, "reservation-request-dlq");
consumedMessageService.save(
request, "reservation-request-dlq", reservationRequestDeadLetterQueue);
}

@RqueueListener(value = "${list.email.queue.name}", active = "${list.email.queue.enabled}")
public void onMessageEmailList(List<Email> emailList) throws JsonProcessingException {
log.info("onMessageEmailList {}", emailList);
String consumedId = UUID.randomUUID().toString();
for (Email email : emailList) {
consumedMessageService.save(email, consumedId);
consumedMessageService.save(email, consumedId, listEmailQueue);
}
}

@RqueueListener(
value = "${periodic.job.queue.name}",
active = "${periodic.job.queue.active}",
deadLetterQueue = "${periodic.job.dead.letter.queue.name}",
numRetries = "${periodic.job.queue.retry.count}")
public void onPeriodicJob(PeriodicJob periodicJob) throws Exception {
log.info("onPeriodicJob: {}", periodicJob);
if (failureManager.shouldFail(periodicJob.getId())) {
throw new Exception("Failing PeriodicJob task to be retried" + periodicJob);
}
consumedMessageService.save(periodicJob, UUID.randomUUID().toString(), periodicJobQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.test.entity.ConsumedMessage;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
import com.github.sonus21.rqueue.utils.StringUtils;
Expand Down Expand Up @@ -101,10 +102,13 @@ public abstract class SpringTestBase extends TestBase {
@Value("${list.email.queue.name}")
protected String listEmailQueue;

@Value("${periodic.job.queue.name}")
protected String periodicJobQueue;

protected void enqueue(Object message, String queueName) {
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), message, queueName, null, null, null);
rqueueMessageManager.getMessageConverter(), queueName, message, null, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
}

Expand All @@ -113,7 +117,7 @@ protected void enqueue(String queueName, Factory factory, int n) {
Object object = factory.next(i);
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), object, queueName, null, null, null);
rqueueMessageManager.getMessageConverter(), queueName, object, null, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
}
}
Expand All @@ -124,15 +128,15 @@ protected void enqueueIn(String zsetName, Factory factory, Delay delay, int n) {
long score = delay.getDelay(i);
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), object, zsetName, null, score, null);
rqueueMessageManager.getMessageConverter(), zsetName, object, null, score, null);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
}
}

protected void enqueueIn(Object message, String zsetName, long delay) {
RqueueMessage rqueueMessage =
RqueueMessageUtils.buildMessage(
rqueueMessageManager.getMessageConverter(), message, zsetName, null, delay, null);
rqueueMessageManager.getMessageConverter(), zsetName, message, null, delay, null);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
}

Expand Down Expand Up @@ -185,6 +189,13 @@ protected void printQueueStats(String queueName) {
printQueueStats(Collections.singletonList(queueName));
}

protected void printConsumedMessage(String queueName) {
for (ConsumedMessage consumedMessage :
consumedMessageService.getConsumedMessagesForQueue(queueName)) {
log.info("Queue {} Msg: {}", queueName, consumedMessage);
}
}

protected void cleanQueue(String queue) {
QueueDetail queueDetail = EndpointRegistry.get(queue);
stringRqueueRedisTemplate.delete(queueDetail.getQueueName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;

@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class PeriodicJob extends BaseQueueMessage {
private String jobName;

public static PeriodicJob newInstance() {
PeriodicJob job = new PeriodicJob();
job.setId(UUID.randomUUID().toString());
job.setJobName(RandomStringUtils.randomAlphabetic(10));
return job;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package com.github.sonus21.rqueue.test.entity;

import java.util.UUID;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.PreUpdate;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -33,12 +37,49 @@
@ToString
@EqualsAndHashCode
@Entity
@Table(
name = "consumed_messages",
uniqueConstraints = {@UniqueConstraint(columnNames = {"message_id", "tag"})})
public class ConsumedMessage {
@Id private String id;

@Id @Column private String id;

@Column(name = "message_id")
private String messageId;

@Column(name = "tag")
private String tag;

@Column private String queueName;

// Around 1 MB of data
@Column(length = 1000000)
private String message;

@Column private String tag;
@Column private Long createdAt;

@Column private Long updatedAt;

@Column private int count;

@PreUpdate
public void update() {
this.updatedAt = System.currentTimeMillis();
}

public ConsumedMessage(String messageId, String tag, String queueName, String message) {
this(
UUID.randomUUID().toString(),
messageId,
tag,
queueName,
message,
System.currentTimeMillis(),
System.currentTimeMillis(),
1);
}

public void incrementCount() {
this.count += 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@
package com.github.sonus21.rqueue.test.repository;

import com.github.sonus21.rqueue.test.entity.ConsumedMessage;
import java.util.Collection;
import java.util.List;
import org.springframework.data.repository.CrudRepository;

public interface ConsumedMessageRepository extends CrudRepository<ConsumedMessage, String> {}
public interface ConsumedMessageRepository extends CrudRepository<ConsumedMessage, String> {
List<ConsumedMessage> findByQueueName(String queueName);

List<ConsumedMessage> findByMessageId(String messageId);

List<ConsumedMessage> findByMessageIdIn(Collection<String> messageIds);

ConsumedMessage findByMessageIdAndTag(String messageId, String tag);
}

0 comments on commit 427ba9a

Please sign in to comment.