Hutch 是一个利用 quarkus extension 的方式实现的与 https://github.com/wppurking/hutch-schedule 兼容的后端消息任务处理库.
其设计为利用 RabbitMQ 的 topic exchange 进行消息交互, 利用 classic queue 的 DLX + message ttl 机制进行消息重投的类似后端任务系统. 消息定时采取的是梯度式的延时, 因为 RabbitMQ 的限制无法保证秒级别的精准, 只拥有固定梯度的定时延迟.
- 在 quarkus 项目的 build.gradle 中添加 github 上的 packages
repositories {
mavenCentral()
maven {
name 'github packages'
url 'https://maven.pkg.github.com/wppurking/hutch-java'
}
mavenLocal()
}
- 添加依赖
implementation 'com.easyacc:hutch:1.0.0'
- 添加需要的 rabbitmq 的配置, 这里使用 yaml 格式方便阅读
quarkus:
log:
level: INFO
# 使用 idea 会进行自动补全
hutch:
name: lake_web
virtual-host: /
username: guest
password: guest
hostname: 127.0.0.1
# 是否使用 quorum queue
quorum: false
port: 5672
- 添加 Consumer 实现 HutchConsumer 接口
public class AbcTest implements HutchConsumer {
private static final Logger log = Logger.getLogger(AbcTest.class);
public void onMessage(Message message) {
log.info(
"Thraed: {}, msg: {}, prps: {}",
Thread.currentThread().getName(),
message.getBodyContentAsString(),
message.getMessageProperties());
}
// 设置最大的重试次数
@Override
public int maxRetry() {
return 5;
}
// 设置当前 AbcTest 的 queue 的最大并发 consumers
@Override
public int concurrency() {
return 100;
}
}
- 自己在代码中进行启动 (后期可能会改为让 quarkus 自行合成好 Hutch bean 实例自行启动与停止)
@ApplicationScoped
public class App {
@Inject
HutchConfig config;
Hutch hutch;
void onStart(@Observes StartupEvent ev) throws IOException {
this.hutch = new Hutch(this.config).start();
}
void onStop(@Observes ShutdownEvent ev) {
this.hutch.stop();
}
}
- 使用 quarkus cli 执行
quarkus dev
自动检查依赖并启动.
在 hutch-java 中, 所有涉及到的 queue 只能同时使用一种类型的 queue type, 要么全部都是默认的 classic queue, 要么就全部切换到集群方式的 quorum queue. 不在 HutchConsumer 级别做 quorum queue 申明的原因有:
- 对 rabbitmq 版本到 3.10 以后, quorum queue 与 classic queue 的区别已经非常小
- quorum queue 可以在单节点也可以在多节点上使用
- quorum queue 的高可用性非常用于代替 classic 是非常大的吸引力
- 如果是产品环境, 大多数情况都应该部署高可用环境, 同时使用 quorum queue
所以, 在全局做了一个 quorum queue 开启关闭的开关.
- maven 3.9 的 部署问题: community/community#49001