redis实现延迟队列
-
低延迟:使用redis的zset数据结构和本地时间轮实现,直接从redis中pop超时任务,避免轮询扫描,大大减少了延迟的时间
-
高可用:至少消费一次保障了定时消息一定被消费
备注:由于redis的本身的特点,redis持久化是异步的,如果发生redis宕机,可能会丢失1s的消息。
<dependency>
<groupId>com.yanghui.redis.queue</groupId>
<artifactId>redis-delay-queue-client</artifactId>
<version>${最新版本}</version>
</dependency>RedisProducer redisProducer = new RedisProducer();
redisProducer.sendDelayMessage(Message.create("order_cancel","消息:" + DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss.SSS")),1000 * 2);备注:默认创建的RedisProducer连接的是本地redis,如果需要指定的redis,创建一个RedissonClient即可,本组件依赖Redisson;消费端一样,下面不作特殊说明了。
RedisConsumer redisConsumer = new RedisConsumer();
redisConsumer.setMaxRetryCount(5);
redisConsumer.subscribe("order_cancel");
redisConsumer.registerMessageListener(message -> {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + " " + message);
return MessageStatus.SUCCESS;
});
redisConsumer.start();1、Redisson的相关配置,参考:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
2、消费端:
- consumeThreadMin:消费端处理消息线程池最小线程数
- consumeThreadMax:消费端处理消息线程池最大线程数
- maxRetryCount:异常时,消息的最大重试次数,目前只支持16个级别的重试,10,30,60,120,180,240,300,360,420,480,540,600,1200,1800,3600,7200,单位秒,此值只能小于等于16,默认16次,超过重试次数,即进入死信队列。
为了方便体验延迟队列的功能,这里提供一个命令行工具。
可以通过命令来查看所有的参数说明:
java -jar cli.jar -h
-
发送消息
java -jar cli.jar sendMessage -r 127.0.0.1:6379 -t order_cancel -m 消息1 -d 5000
-
消费消息
java -jar cli.jar subMessage -r 127.0.0.1:6379 -t order_cancel
topic是逻辑概念,这里对应redis的key有5个,分别的作用如下:
-
redis_delay_queue_origin:{Topic名称},对应redis的zset数据结构,value表示消息的ID,score表示消息的到期执行时间戳,会有定时任务来扫描到期的消息 -
redis_delay_queue_store:{Topic名称},对应redis的hash数据结构,hash的key表示消息ID,value表示消息的内容 -
redis_delay_queue_list:{Topic名称},对应redis的list数据结构,存储的是消息ID,每当消息到期时间到了,redis_delay_queue_origin:{Topic名称}移动到当前list中,消费者会监听这个list。 -
redis_delay_queue_pre:{Topic名称},对应redis的zset数据结构,为了保证每条消息的至少消费一次,每次把到期消息移动至redis_delay_queue_list:{Topic名称}外,这里还会放置到当前队列中,等业务真正消费完成,进行ack后,再删除。 -
redis_delay_queue_DLQ:{Topic名称},对应redis的zset数据结构,消息一旦超过重试次数,即会进入死信队列。
如果消息ack失败,消息会一直处于redis_delay_queue_pre:{Topic名称}队列中,而无法消费了,这时会启动一个定时任务,定时扫描redis_delay_queue_pre:{Topic名称},把时间超过3分钟没有ack的消息重新发布到redis_delay_queue_origin:{Topic名称}队列中,让消息重新消费。


