A distributed delay queue based on Redisson.
Add this to your pom.xml
<!-- ${redisson-delay-queue.version} == ${latest.version} -->
<!-- https://central.sonatype.com/artifact/io.github.photowey/redisson-delay-queue/versions -->
<dependency>
<groupId>io.github.photowey</groupId>
<artifactId>redisson-delay-queue-spring-boot-starter</artifactId>
<version>${redisson-delay-queue.version}</version>
</dependency><dependency>
<!-- // ... -->
<artifactId>redisson-delay-queue-spring-boot3-starter</artifactId>
<!-- // ... -->
</dependency>spring:
redis:
# ...
redisson:
delayqueue:
enabled: true
mode: SINGLE
address: "redis://${local.config.redis.host}:${local.config.redis.port}"
password: "${local.config.redis.password}"
master: "master"
database: 0
timeout: 10_000
delayed:
# 7 DAYS
max: 604_800_000
# Default topic.
# Default value: io.github.photowey.global.redisson.delayqueue.topic
topic: "io.github.photowey.global.redisson.delayqueue.topic"
# Custom topics, if necessary.
topics:
- "io.github.photowey.hello.world.delayed.query.delayqueue.topic"
# Scheduled executor config.
ticker:
initial-delay: 0
period: 5
unit: "SECONDS"
# poll task timeout config.
poll:
timeout: 2
unit: "SECONDS"
# Global registry cache-key.
registry:
# Topics
topic-set: "io:github:photowey:global:redisson:delayqueue:topicset"
# TaskIds
task-set: "io:github:photowey:global:redisson:delayqueue:taskset"@Bean
@ConditionalOnMissingBean(RedissonClient.class)
public RedissonClient redisson(RedissonProperties redissonProperties) {
// Currently only single node is supported, via auto-configuration.
return this.populateRedissonClient(redissonProperties);
}RedissonDelayedQueue delayedQueue = this.applicationContext().getBean(RedissonDelayedQueue.class);@Autowired
private RedissonDelayedQueue delayedQueue
// ...RedissonDelayedQueue delayedQueue = this.applicationContext().getBean(RedissonDelayedQueue.class);
for(
int i = 0;
i< 2;i++){
RedissonDelayedTask<Serializable> task = RedissonDelayedTask.builder()
.taskId(String.valueOf((i + 1)))
.payload(String.valueOf((i + 1))) // String payload
.delayed((i + 2))
.timeUnit(TimeUnit.SECONDS.name())
.build();
delayedQueue.
offer(task);
}RedissonDelayedQueue delayedQueue = this.applicationContext().getBean(RedissonDelayedQueue.class);
for(
int i = 0;
i< 2;i++){
HelloPayload payload = HelloPayload.builder()
.id(1760223724043808770L)
.name("photowey" + (i + 1))
.age(18 + i)
.build();
RedissonDelayedTask<Serializable> task = RedissonDelayedTask.builder()
// Custom topic
.topic("io.github.photowey.hello.world.delayed.query.delayqueue.topic")
// TaskId
.taskId("io.github.photowey.hello.world" + "." + (i + 1))
// Body payload
.payload(payload)
.delayed((i + 1) * 5)
.timeUnit(TimeUnit.SECONDS.name())
.build();
delayedQueue.
offer(task);
}Add custom listener implements
DelayedQueueEventListener
@Slf4j
public class Multi1DelayedQueueEventListener implements DelayedQueueEventListener {
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 100;
}
@Override
public boolean supports(TaskContext<?> ctx) {
// Filter topic or task.
return ctx.topic().startsWith("io.github.photowey.hello.world")
&& ctx.taskId().startsWith("io.github.photowey.hello.world");
}
@Override
public void handle(TaskContext<?> ctx) {
// ...
}
}Add custom listener extends
AbstractAntDelayedQueueEventListener
@Slf4j
public class AntMultiDelayedQueueEventListener extends AbstractAntDelayedQueueEventListener {
@Autowired
private Counter counter;
@Override
public int getOrder() {
// The sorting order of Listener.
return Ordered.HIGHEST_PRECEDENCE + 300;
}
@Override
public boolean supports(TaskContext<?> ctx) {
// Matches
return this.matches("io.github.photowey:hello:world:*", ctx.topic())
&& this.matches("io.github.photowey.ant.*", ctx.taskId());
}
@Override
public void handle(TaskContext<?> ctx) {
// ...
}
}/**
* Determines if the given path matches the specified pattern.
*
* This method first converts the given pattern and path to the Ant expression path format.
* The conversion rules are as follows:
* 1. If the pattern or path contains a period ('.') or a colon (':'), it will be escaped to a slash ('/').
* 2. Supports wildcard characters:
* - `#` represents a single word.
* - `*` represents multiple words.
*
* After conversion, the method uses AntPathMatcher to perform the matching.
*
* @param pattern The pattern to match, supporting wildcard characters and escaping.
* @param path The path to match, supporting wildcard characters and escaping.
* @return true if the path matches the pattern; false otherwise.
*/
@Override
public boolean matches(String pattern, String path) {
String convertedPattern = this.convertToAntPattern(pattern);
String convertedPath = this.convertToAntPath(path);
return this.matcher.match(convertedPattern, convertedPath);
}