Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

建议延迟队列增加一个重新发送覆盖前一个任务的功能 #9

Closed
hhrensheng opened this issue Jan 22, 2024 · 22 comments
Closed
Labels
question Further information is requested

Comments

@hhrensheng
Copy link

建议延迟队列增加消息重发机制,比如增加一个参数,消息id 多次发送时如果ID相同可以覆盖之前发送的消息。

@chaz6chez chaz6chez added the enhancement New feature or request label Jan 25, 2024
@chaz6chez
Copy link
Member

是队列publish去重吗?

@chaz6chez chaz6chez added question Further information is requested and removed enhancement New feature or request labels Jan 25, 2024
@chaz6chez
Copy link
Member

队列的publish依赖redis stream 的XADD,XADD如果添加两条相同的ID的消息时,假设消息还未被消费者读取,那么后一条消息会覆盖前一条消息;
sync_publish()方法支持传递headers,可以用于指定消息id,默认是由redis stream自动生成消息id;

// 发送一条id为1的消息
sync_publish(TestBuilder::instance(), 'test message', [
    '_id' => 1
])
// 覆盖id为1的消息
sync_publish(TestBuilder::instance(), 'test message cover', [
    '_id' => 1
])

@hhrensheng
Copy link
Author

队列的publish依赖redis stream 的XADD,XADD如果添加两条相同的ID的消息时,假设消息还未被消费者读取,那么后一条消息会覆盖前一条消息; sync_publish()方法支持传递headers,可以用于指定消息id,默认是由redis stream自动生成消息id;

// 发送一条id为1的消息
sync_publish(TestBuilder::instance(), 'test message', [
    '_id' => 1
])
// 覆盖id为1的消息
sync_publish(TestBuilder::instance(), 'test message cover', [
    '_id' => 1
])

了解了,就是要这个效果。感谢

@hhrensheng
Copy link
Author

SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误' ]); 可以触发

SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误', '_id' => 'open_7', ]);触发不了队列

@chaz6chez
Copy link
Member

chaz6chez commented Jan 29, 2024

SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误' ]); 可以触发

SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误', '_id' => 'open_7', ]);触发不了队列

请补充一下错误信息

另外,id建议使用redis约定的id格式为两个数字,中间以-分割,如1234567-1

@hhrensheng
Copy link
Author

hhrensheng commented Jan 30, 2024

1234567-1

我这边手动删除了redis中的数据可以提交了。但是发现了一个问题:第一次发布延迟队列,可以正常消费。消费完成后再次向同样的消费类发布延迟消息_id改成其他,发布后没有错误信息,但是不消费,查看redis 值是空的。如果把_id删除,再发布是可以消费成功的

@hhrensheng
Copy link
Author

redis版本:6.2.6
webman-rqueue版本:2.1.4

并且我第一次设置_id发布的时候,消费端打印$value的时候发现_header里的_id还是*,这个我不确定,想验证的时候就出现上面的问题了

@hhrensheng
Copy link
Author

image
image

@chaz6chez
Copy link
Member

chaz6chez commented Jan 30, 2024

1234567-1

我这边手动删除了redis中的数据可以提交了。但是发现了一个问题:第一次发布延迟队列,可以正常消费。消费完成后再次向同样的消费类发布延迟消息_id改成其他,发布后没有错误信息,但是不消费,查看redis 值是空的。如果把_id删除,再发布是可以消费成功的

每个Builder对应一个消费group,当消息被消费了后,消息在当前消费group的游标内就不存在了,所以修改id以后该消费group的消费者是无法再读取到该消息的,但是不影响其他消费group可以读取到该id的消息。

QueueBuilder的特性是每消费完一个消息就会将group游标移动并且移除该消息;
GroupBuilder的特性是没消费完一个消息仅移动group游标,移除消息交给定时器进行处理;

你是想实现什么功能吗?

@hhrensheng
Copy link
Author

1234567-1

我这边手动删除了redis中的数据可以提交了。但是发现了一个问题:第一次发布延迟队列,可以正常消费。消费完成后再次向同样的消费类发布延迟消息_id改成其他,发布后没有错误信息,但是不消费,查看redis 值是空的。如果把_id删除,再发布是可以消费成功的

每个Builder对应一个消费group,当消息被消费了后,消息在当前消费group的游标内就不存在了,所以修改id以后该消费group的消费者是无法再读取到该消息的,但是不影响其他消费group可以读取到该id的消息。

QueueBuilder的特性是每消费完一个消息就会将group游标移动并且移除该消息; GroupBuilder的特性是没消费完一个消息仅移动group游标,移除消息交给定时器进行处理;

感谢作者的耐心解答。我觉得我还是应该恶补一下redis stream相关的知识
我只想想实现,有一个任务。我需要再在未来不确定的一些时间去重复执行它。比如2月5号需要执行这个任务,3月8再去执行这个任务,xx月xx日再次执行等等。然后未来的某一个时间我发现3月8号的任务我不需要执行了,我需要去3月9号去执行这个任务,所以我要去修改3月8号的延迟执行时间。

@hhrensheng
Copy link
Author

当然我可以在3月9号发布一个新的任务,然后在消费逻辑里来判断任务的真实执行时间,只是这样的话不是最佳处理方式。

@hhrensheng
Copy link
Author

后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?

@chaz6chez
Copy link
Member

后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?

最简单的方式就是,你自己消息体内约定一个id,然后在执行的时候去判断一下这个id是否存在黑名单里,比如使用redis储存黑名单,当存在黑名单时业务逻辑忽略,消息自然会被队列消费者移除

@hhrensheng
Copy link
Author

后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?

最简单的方式就是,你自己消息体内约定一个id,然后在执行的时候去判断一下这个id是否存在黑名单里,比如使用redis储存黑名单,当存在黑名单时业务逻辑忽略,消息自然会被队列消费者移除

了解,这种方式就是类似于在消费逻辑里面增加判断对吧

@chaz6chez
Copy link
Member

后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?

最简单的方式就是,你自己消息体内约定一个id,然后在执行的时候去判断一下这个id是否存在黑名单里,比如使用redis储存黑名单,当存在黑名单时业务逻辑忽略,消息自然会被队列消费者移除

了解,这种方式就是类似于在消费逻辑里面增加判断对吧

是的,后续版本我会增加一个publishGetIds的方法,用于发布并获取消息ID组,这个版本今天就会更新;
不过依赖消息队列的ID并不是万全之策,通常来说不建议手动通过redis基础方法对队列进行操作,因为队列的运行过程依赖这一整套流程;
当然,如果你对redis stream比较熟悉了,我建议可以根据你的需求参考自定义Builder那一部分自定义一个Builder,这样可能更靠谱一些,因为QueueBuilder和GroupBuilder都已经有属于他们自己的消费逻辑套路了。

@hhrensheng
Copy link
Author

好的感谢。
我这边新增了一个测试的延迟任务测试后发现。_id的格式有要求,如果是用80614515588308743-118类似这种格式的id就可以被正常消费的。但是我第二次想覆盖之前的任务时发现不成功。
具体表现
首先发布一个延迟60秒的任务并设置_id
image
发布一个新的延迟任务,此时任务_id是一个新的id
image
修改第一次发布的任务,把延迟时间改成10秒,此时使用第一设置的id
image

执行结果
image

@chaz6chez
Copy link
Member

好的感谢。 我这边新增了一个测试的延迟任务测试后发现。_id的格式有要求,如果是用80614515588308743-118类似这种格式的id就可以被正常消费的。但是我第二次想覆盖之前的任务时发现不成功。 具体表现 首先发布一个延迟60秒的任务并设置_id image 发布一个新的延迟任务,此时任务_id是一个新的id image 修改第一次发布的任务,把延迟时间改成10秒,此时使用第一设置的id image

执行结果 image

消息格式是{整数}-{整数}的要求格式;
不可以用消费者去验证消息是否被覆盖,可以通过redis的管理工具去查看对应消息ID是否被覆盖;
消费者是基于GROUP的消费模式,消息一旦被消费,该GROUP的消息游标就会移动,该GROUP将再也无法读取已读取过的消息;

@hhrensheng
Copy link
Author

第二个任务的消费时间02:34:43和发布时间02:34:23正好差20秒是没问题的
但是第一次提交的时间是02:34:09 后面修改它的时间是02:34:46 消费的时间是02:35:09。这个时间是第一次提交的60秒之后,而不是修改的10秒后。
并且消费逻辑里面获取的_Id是"*"而不是我发布时设置的id

@hhrensheng
Copy link
Author

好的感谢。 我这边新增了一个测试的延迟任务测试后发现。_id的格式有要求,如果是用80614515588308743-118类似这种格式的id就可以被正常消费的。但是我第二次想覆盖之前的任务时发现不成功。 具体表现 首先发布一个延迟60秒的任务并设置_id image 发布一个新的延迟任务,此时任务_id是一个新的id image 修改第一次发布的任务,把延迟时间改成10秒,此时使用第一设置的id image
执行结果 image

消息格式是{整数}-{整数}的要求格式; 不可以用消费者去验证消息是否被覆盖,可以通过redis的管理工具去查看对应消息ID是否被覆盖; 消费者是基于GROUP的消费模式,消息一旦被消费,该GROUP的消息游标就会移动,该GROUP将再也无法读取已读取过的消息;

难道覆盖的作用不是覆盖已经发布的任务吗,不然这个覆盖的意义在哪呢

@hhrensheng
Copy link
Author

我看之前说的覆盖前提是“假设消息还未被消费者读取”我想问下发布延迟任务后,消费者什么以后会读取这个任务呢。中间有我这边可以干预的步骤吗

@chaz6chez
Copy link
Member

我看之前说的覆盖前提是“假设消息还未被消费者读取”我想问下发布延迟任务后,消费者什么以后会读取这个任务呢。中间有我这边可以干预的步骤吗

redis stream是流式队列,队列在严格意义上没有延迟消息的说法,是通过requeue模拟出来的,也就是说delay消息实际上是在Builder中进行了无数次读取放回,直到delay条件达到,再触发消费回调函数;

stream的覆盖消息主要用于多GROUP消费相同队列时的对后者消费GROUP的消息修正,并不是为了延迟队列或者延迟行为场景而诞生的;

消费者的Builder的消费逻辑无法干预,需要你在handler中进行逻辑干预,或者自行实现自定义Builder实现符合你场景的Builder消费器

@hhrensheng
Copy link
Author

我看之前说的覆盖前提是“假设消息还未被消费者读取”我想问下发布延迟任务后,消费者什么以后会读取这个任务呢。中间有我这边可以干预的步骤吗

redis stream是流式队列,队列在严格意义上没有延迟消息的说法,是通过requeue模拟出来的,也就是说delay消息实际上是在Builder中进行了无数次读取放回,直到delay条件达到,再触发消费回调函数;

stream的覆盖消息主要用于多GROUP消费相同队列时的对后者消费GROUP的消息修正,并不是为了延迟队列或者延迟行为场景而诞生的;

消费者的Builder的消费逻辑无法干预,需要你在handler中进行逻辑干预,或者自行实现自定义Builder实现符合你场景的Builder消费器

感谢作者,我在学习一下redis stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants