Skip to content

Commit

Permalink
Web Interface:
Browse files Browse the repository at this point in the history
- Web interface to visualize queue
- Move message from one queue to another
- Latency visualizer
- Delete message from the queue

Other Changes
- Allow prefixing redis keys to avoid accidental key delete
- Allow deactivating a consumer in a given environment
- Redis cluster support
- Complete isolation of Redis, allow application to configure one Redis for the application and one for the Rqueue
- Queue concurrency
- Queue priority (Weighted and strict)
- Queue priority at group level
- Queue priority at sub queue level like critical, high, medium, low

Breaking Changes
- Queue names are prefixed, version 1.0 users need to set a redis key __rq::version with value 1
- Renamed annotation field maxJobExecutionTime to visibilityTimeout

Fixes
- Spring Optional Micrometer, in older version config class was importing micrometer related classes, that could lead to error if classes are not found. In this version now code depends on bean name using DependsOn annotation.

Non-Functional Change
- Remove getter, setter etc related code and replaced them with lombok where-ever applicable.
- Use static import for Assert utils
- Allow sharing of redis message listener container, if it's disabled then a new Redis listener container would be created. This will allow completely to use two different databases, one for the listener and one for the application.'
  • Loading branch information
sonus21 committed May 10, 2020
1 parent cbd6b80 commit 1e188a4
Show file tree
Hide file tree
Showing 327 changed files with 61,749 additions and 3,392 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Expand Up @@ -24,7 +24,6 @@ jobs:
- SPRING_DATA_VERSION=2.1.2.RELEASE
- MICROMETER_VERSION=1.1.0
script:
- ./gradlew check
- ./gradlew codeCoverageReport
- stage: spring-boot-2.2
env:
Expand All @@ -33,7 +32,6 @@ jobs:
- SPRING_DATA_VERSION=2.2.0.RELEASE
- MICROMETER_VERSION=1.3.2
script:
- ./gradlew check
- ./gradlew codeCoverageReport


Expand Down
65 changes: 44 additions & 21 deletions CHANGELOG.md
Expand Up @@ -4,43 +4,66 @@ All notable changes to Rqueue project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [2.0.0] - TBD
### Added
- Web interface to visualize queue
- Move message from one queue to another
- Latency visualizer
- Delete message from the queue
- Allow deactivating a consumer in a given environment
- Single or multiple execution of polled messages
- Queue level concurrency
- BackOff for failed message, linear or exponential
- Group level queue priority
- Multi level queue priority
- Strict or weighted algorithm for message execution

### Breaking Changes
- Queue names are prefixed, that can lead to error. 1.x users set REDIS key `__rq::version` to `1`. It does try to find the version using key prefix, but if all queues are empty or no key exist in REDIS with prefix `rqueue-` then it will consider version 2.
- Renamed annotation field `maxJobExecutionTime` to `visibilityTimeout`

### Fixes
- **Spring** Optional Micrometer, in older version config class was importing micrometer related classes, that could lead to error if classes are not found. In this version now code depends on bean name using DependsOn annotation.

## [1.4.0] - 08-Apr-2020
#### Added
- Allow queue level configuration of job execution time.
- Support to add Message processor for discard and dead letter queue
* Allow queue level configuration of job execution time.
* Support to add Message processor for discard and dead letter queue

## [1.3.2] - 01-Apr-2020
### Added
- Support lower version of spring 2.1.x
* Support lower version of spring 2.1.x


## [1.3.1] - 27-Feb-2020
### Fixed
- Bootstrap issue due to optional dependencies of micrometer
* **Fixed** Bootstrap issue due to optional dependencies of micrometer


## [1.3] - 11-Dec-2019
### Added
- Expose 6 queue metrics using micrometer. (queue-size, delay queue size, processing queue size, dead letter queue size, execution counter, failure counter)
- An api to move messages from dead letter queue to other queue. (Any source queue to target queue).
### Added
* Expose 6 queue metrics using micrometer. (queue-size, delay queue size, processing queue size, dead letter queue size, execution counter, failure counter)
* An api to move messages from dead letter queue to other queue. (Any source queue to target queue).

### Fixed
- An issue in scheduler that's always scheduling job at the delay of 5 seconds. (this leads to messages are not copied from delayed queue to main queue on high load)
* An issue in scheduler that's always scheduling job at the delay of 5 seconds. (this leads to messages are not copied from delayed queue to main queue on high load)


## [1.2] - 03-Nov-2019
- Fixed typo of *Later* to *Letter*
* Fixed typo of *Later* to *Letter*


## [1.1] - 02-Nov-2019
### Added
- At least once message guarantee
- Reduced ZSET calls
- Use Lua script to execute synchronized task



* At least once message guarantee
* Reduced ZSET calls
* Lua script to make atomic operation

## [1.0] - 23-Oct-2019
- Basic version of Asynchronous task execution

* Basic version of Asynchronous task execution using Redis for Spring and Spring Boot

[1.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.0-RELEASE
[1.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.1-RELEASE
[1.2]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.2-RELEASE
[1.3]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.3-RELEASE
[1.3.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.3.1-RELEASE
[1.3.2]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.3.2-RELEASE
[1.4.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.4.0-RELEASE
[2.0.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.0-RELEASE
136 changes: 97 additions & 39 deletions README.md
@@ -1,6 +1,6 @@
<img align="right" src="https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/RQueue-icon.png" alt="Rqueue Logo" width="350">
<img align="left" src="https://raw.githubusercontent.com/sonus21/rqueue/master/rqueue/src/main/resources/public/rqueue/img/android-chrome-192x192.png" alt="Rqueue Logo" width="110">

# Rqueue: RedisQueue, Redis Task Queue for Spring and Spring Boot
# Rqueue: Redis Queue,Task Queue, Delayed Queue for Spring and Spring Boot

[![Build Status](https://travis-ci.org/sonus21/rqueue.svg?branch=master)](https://travis-ci.org/sonus21/rqueue)
[![Coverage Status](https://coveralls.io/repos/github/sonus21/rqueue/badge.svg?branch=master)](https://coveralls.io/github/sonus21/rqueue?branch=master)
Expand All @@ -9,10 +9,22 @@

Rqueue is an asynchronous task executor(worker) built for spring framework based on the spring framework's messaging library backed by Redis.

### Some of the features

* A message can be delayed for an arbitrary period of time or delivered immediately.
* A message can be delayed for an arbitrary period of time or delivered immediately.
* Multiple messages can be consumed in parallel by different workers.
* Message delivery: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
* Support Redis cluster
* Queue metrics
* Different Redis connection for application and worker
* Web interface for queue management and queue stats
* Automatic message serialization and deserialization
* Consumer concurrency
* Group level queue priority( weighted and strict)
* Sub queue priority (weighted and strict)
* Task execution back off, exponential and fixed back off




### Adding a task
Expand All @@ -38,15 +50,14 @@ Add Dependency
* Get latest one from [Maven central](https://search.maven.org/search?q=g:com.github.sonus21%20AND%20a:rqueue-spring-boot-starter)
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring-boot-starter:1.3-RELEASE'
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.0-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring-boot-starter</artifactId>
<version>1.3-RELEASE</version>
<type>pom</type>
<version>2.0.0-RELEASE</version>
</dependency>
```

Expand All @@ -56,15 +67,14 @@ Add Dependency
Get latest one from [Maven central](https://search.maven.org/search?q=g:com.github.sonus21%20AND%20a:rqueue-spring)
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring:1.3-RELEASE'
implementation 'com.github.sonus21:rqueue-spring:2.0.0-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring</artifactId>
<version>1.3-RELEASE</version>
<type>pom</type>
<version>2.0.0-RELEASE</version>
</dependency>
```

Expand Down Expand Up @@ -97,52 +107,49 @@ public class MessageListener {
log.info("simple-queue: {}", message);
}

@RqueueListener(value = "delayed-queue", delayedQueue = "true")
public void delayedMessage(String message) {
log.info("delayedMessage: {}", message);
}

@RqueueListener(value = "delayed-queue-2", delayedQueue = "true",
numRetries="3", deadLetterQueue="failed-delayed-queue")
public void delayedMessageWithDlq(String message) {
log.info("delayedMessageWithDlq: {}", message);
}

@RqueueListener(value = "job-queue", delayedQueue = "true",
// Scheduled Job notification
@RqueueListener(value = "job-queue",
numRetries="3", deadLetterQueue="failed-job-queue")
public void onMessage(Job job) {
log.info("Job created: {}", job);
}

@RqueueListener(value = "notification-queue", delayedQueue = "true",
// Scheduled push notification
@RqueueListener(value = "push-notification-queue",
numRetries="3", deadLetterQueue="failed-notification-queue")
public void onMessage(Notification notification) {
log.info("Notification message: {}", notification);
}

// asynchronously send otp to the user
@RqueueListener(value = "otp", priority="critical=10,high=8,medium=4,low=1")
public void onMessage(Otp otp) {
log.info("Otp message: {}", otp);
}
}
```

### Message publishing or task submission
All messages can be send using `RqueueMessageSender` bean's methods. It has handful number of put methods, we can use one of them based on the use case.
All messages can be send using `RqueueMessageSender` bean's methods. It has handful number of `enqueue` and `enqueueIn` methods, we can use one of them based on the use case.


```java
public class MessageService {
@AutoWired private RqueueMessageSender rqueueMessageSender;

public void doSomething(){
rqueueMessageSender.put("simple-queue", "Rqueue is configured");
rqueueMessageSender.enqueue("simple-queue", "Rqueue is configured");
}

public void createJOB(Job job){
//do something
rqueueMessageSender.put("job-queue", job);
rqueueMessageSender.enqueue("job-queue", job);
}

// send notification in 30 seconds
public void sendNotification(Notification notification){
//do something
rqueueMessageSender.put("notification-queue", notification, 30*1000L);
rqueueMessageSender.enqueueIn("notification-queue", notification, 30*1000L);
}
}
```
Expand Down Expand Up @@ -179,13 +186,18 @@ class Application{


```java
// Create redis connection factory of your choice either Redission or Lettuce or Jedis
// Get redis configuration
RedisConfiguration redisConfiguration = new RedisConfiguration();
// Set fields of redis configuration
// Create lettuce connection factory
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration);
factory.setRedisConnectionFactory(lettuceConnectionFactory);
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(RqueueMessageHandler rqueueMessageHandler){
// Create redis connection factory of your choice either Redission or Lettuce or Jedis
// Get redis configuration
RedisConfiguration redisConfiguration = new RedisConfiguration();
// Set fields of redis configuration
// Create lettuce connection factory
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration);
factory.setRedisConnectionFactory(lettuceConnectionFactory);
facory.setRqueueMessageHandler(rqueueMessageHandler);
return factory;
}
```
---

Expand All @@ -209,7 +221,7 @@ SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContaine
factory.setMaxNumWorkers(10);
```

By default number of task executors are same as number of queues. A custom or shared task executor can be configured using factory's `setTaskExecutor` method, we need to provide an implementation of AsyncTaskExecutor
By default number of task executors are twice of the number of queues. A custom or shared task executor can be configured using factory's `setTaskExecutor` method, we need to provide an implementation of AsyncTaskExecutor


```java
Expand All @@ -224,11 +236,33 @@ factory.setTaskExecutor(threadPoolTaskExecutor);

---
**Manual/Auto start of the container**

Whenever container is refreshed then it can be started automatically or manfully. Default behaviour is to start automatically, to change this behaviour set auto-start to false.
Whenever container is refreshed or application is started then it is be started automatically and it comes with a graceful shutdown.
Automatic start of the container can be controlled using `autoStartup` flag, when autoStartup is false then application must call start and stop
method of container to start and stop the container, for further graceful shutdown you should call destroy method as well.

```java
factory.setAutoStartup(false);

```

```java
public class BootstrapController {
@Autowired private RqueueMessageListenerContainer rqueueMessageListenerContainer;
// ...
public void start(){
// ...
rqueueMessageListenerContainer.start();
}
public void stop(){
// ...
rqueueMessageListenerContainer.stop();
}
public void destroy(){
// ...
rqueueMessageListenerContainer.destroy();
}
//...
}
```

---
Expand Down Expand Up @@ -282,13 +316,37 @@ All these metrics are tagged
[![Grafana Dashboard](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/grafana-dashboard.png)](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/grafana-dashboard.png)


## Web Interface
* Queue stats (Scheduled, waiting to run, running, moved to dead letter queue)
* Latency: Min/Max and Average task execution time
* Queue Management: Move tasks from one queue to another
* Delete enqueued tasks

Link: [http://localhost:8080/rqueue](http://localhost:8080/rqueue)

**Configuration**
* Add resource handler to handle the static resources.
```java
public class MvcConfig implements WebMvcConfigurer {

@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
//...
if (!registry.hasMappingForPattern("/**")) {
registry.addResourceHandler("/**").addResourceLocations("classpath:/public/");
}
}
}
```

All paths are under `/rqueue/**`. for authentication add interceptor(s) that would check for the session etc.

## Support
Please report problems/bugs to issue tracker. You are most welcome for any pull requests for feature/issue.
Please report problem, bug or feature(s) to [issue](https://github.com/sonus21/rqueue/issues/new/choose) tracker. You are most welcome for any pull requests for feature, issue or enhancement.

## License
The Rqueue is released under version 2.0 of the Apache License.

© [Sonu Kumar](mailto:sonunitw12@gmail.com) 2019-Instant.now

The Rqueue is released under version 2.0 of the Apache License.


0 comments on commit 1e188a4

Please sign in to comment.