Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KafkaClient Bug] 使用 set_offset_timestamp 设置时间戳不生效 - Not Bug #542

Closed
hiberabyss opened this issue Aug 25, 2021 · 41 comments

Comments

@hiberabyss
Copy link

在设置了 offset_timestamp 之后,获取的消息并不是这个时间点之后的数据。

workflow 版本:0.9.7

image

@wzl12356
Copy link
Contributor

在设置了 offset_timestamp 之后,获取的消息并不是这个时间点之后的数据。

workflow 版本:0.9.7

image

kafka 是什么版本?

@hiberabyss
Copy link
Author

kafka server 的版本是 2.2

@wzl12356
Copy link
Contributor

wzl12356 commented Aug 25, 2021

在非消费者组的模式下,offset_timestamp应该在conf里面设置,示例如下:

KafkaConfig conf;

conf.set_offset_timestamp(xxx);

task->set_config(std::move(conf));

目前由于conf是针对task设置,因此设置的这个timestamp是这个task公用的

@hiberabyss
Copy link
Author

在非消费者组的模式下,offset_timestamp应该在conf里面设置,示例如下:

KafkaConfig conf;

conf.set_offset_timestamp(xxx);

task->set_config(std::move(conf));

目前由于conf是针对task设置,因此设置的这个timestamp是这个task公用的

收到,感谢!那请问如何使用消费者组的模式呢?

@wzl12356
Copy link
Contributor

kafka_client 在调用init的时候需要加上group_name作为参数,例如
client.init(url, "workflow_group");,然后不需要自己设置KafkaToppar了,示例可以参考
tutorial/tutorial-13-kafka_cli.cc

@hiberabyss
Copy link
Author

了解,感谢!

@Barenboim Barenboim changed the title [KafkaClient Bug] 使用 set_offset_timestamp 设置时间戳不生效 [KafkaClient Bug] 使用 set_offset_timestamp 设置时间戳不生效 - Not Bug Aug 26, 2021
@hiberabyss
Copy link
Author

kafka_client 在调用init的时候需要加上group_name作为参数,例如
client.init(url, "workflow_group");,然后不需要自己设置KafkaToppar了,示例可以参考
tutorial/tutorial-13-kafka_cli.cc

问下这里的 kafka task 是在什么时机会调用 callback 呢?是取完数据后需要再重新创建 kafka task 去拉数据么?

@wzl12356
Copy link
Contributor

wzl12356 commented Sep 1, 2021

callback 是当一次kafka task收到kafka broker的response后会调用,比如(produce完成或者fetch完成),kafka的协议基本上request response模式,因此当kafka broker的response返回后,callback会调用。
一次fetch的task只会拉去一次数据,因此需要再次创建task去拉去数据。

@hiberabyss
Copy link
Author

callback 是当一次kafka task收到kafka broker的response后会调用,比如(produce完成或者fetch完成),kafka的协议基本上request response模式,因此当kafka broker的response返回后,callback会调用。
一次fetch的task只会拉去一次数据,因此需要再次创建task去拉去数据。

问下 Commit task 执行完之后 offset 会被永久保存么?我执行完退出程序后,发现好像没有用历史的 offset

@Barenboim Barenboim reopened this Sep 1, 2021
@wzl12356
Copy link
Contributor

wzl12356 commented Sep 1, 2021

如果成功的话,应该会保存在kafka的特定的topic下面,下次启动fetch的时候,内部会先启动一个fetchoffset任务获取offset,然后从这个offset开始获取。

@hiberabyss
Copy link
Author

如果成功的话,应该会保存在kafka的特定的topic下面,下次启动fetch的时候,内部会先启动一个fetchoffset任务获取offset,然后从这个offset开始获取。

这个应该是会保存在 consumer group 下面吧?但我在 commit 完之后发现没有 consumer group 对应的 offset

@hiberabyss
Copy link
Author

如果成功的话,应该会保存在kafka的特定的topic下面,下次启动fetch的时候,内部会先启动一个fetchoffset任务获取offset,然后从这个offset开始获取。

这里 commit 的时候除了用 add_commit_record 外能用 add_toppar 来添加么?

我的需求是批量处理 kafka 消息,最后再 commit 下 offset ,这里不太方便构造 KafkaRecord ,能直接按 toppar 的方式添加么?

@wzl12356
Copy link
Contributor

wzl12356 commented Sep 1, 2021

在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中 Offset信息保存在zookeeper的;而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。

目前设计的是commit的时候使用KafkaRecord(消息)的offset,如果有消息有重复的toppar,会取最大的offset。你处理的消息的时候,调用add_commit_record 添加KafkaRecord的offset(即使有重复也没有关系,内部会取最大值),这样不方便吗?

@hiberabyss
Copy link
Author

在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中 Offset信息保存在zookeeper的;而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。

目前设计的是commit的时候使用KafkaRecord(消息)的offset,如果有消息有重复的toppar,会取最大的offset。你处理的消息的时候,调用add_commit_record 添加KafkaRecord的offset(即使有重复也没有关系,内部会取最大值),这样不方便吗?

因为我是批量处理的,想要把当前处理的最后的 topic partition offset 提前保存下来,然后等任务 task 都执行完成后再执行 commit task 提交偏移量。

目前就基本只能在读完 kafka 消息之后去创建 commit task,感觉不是很方便。

@hiberabyss
Copy link
Author

在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中 Offset信息保存在zookeeper的;而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。

目前设计的是commit的时候使用KafkaRecord(消息)的offset,如果有消息有重复的toppar,会取最大的offset。你处理的消息的时候,调用add_commit_record 添加KafkaRecord的offset(即使有重复也没有关系,内部会取最大值),这样不方便吗?

另外有什么比较方便的办法去获取 topic 的最新 offset 么?想要去监控下当前消费到的 offset 和系统最新 offset 的偏移量,偏移量过大的话发出报警

@wzl12356
Copy link
Contributor

wzl12356 commented Sep 3, 2021

在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中 Offset信息保存在zookeeper的;而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。
目前设计的是commit的时候使用KafkaRecord(消息)的offset,如果有消息有重复的toppar,会取最大的offset。你处理的消息的时候,调用add_commit_record 添加KafkaRecord的offset(即使有重复也没有关系,内部会取最大值),这样不方便吗?

因为我是批量处理的,想要把当前处理的最后的 topic partition offset 提前保存下来,然后等任务 task 都执行完成后再执行 commit task 提交偏移量。

目前就基本只能在读完 kafka 消息之后去创建 commit task,感觉不是很方便。

这个问题,我考虑一下,看看怎么添加一下

@wzl12356
Copy link
Contributor

wzl12356 commented Sep 3, 2021

在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中 Offset信息保存在zookeeper的;而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。
目前设计的是commit的时候使用KafkaRecord(消息)的offset,如果有消息有重复的toppar,会取最大的offset。你处理的消息的时候,调用add_commit_record 添加KafkaRecord的offset(即使有重复也没有关系,内部会取最大值),这样不方便吗?

另外有什么比较方便的办法去获取 topic 的最新 offset 么?想要去监控下当前消费到的 offset 和系统最新 offset 的偏移量,偏移量过大的话发出报警

这个一般应该可以通过外部的监控topic的消费情况,如果偏移量过大可以报警,我记得kafka的bin目录里面有脚本可以获取group的消费以及delay的数据。

@hiberabyss
Copy link
Author

问下现在 api=meta 的作用是什么呀?能增加一个新的 api 支持获取最新的 offset 和对应的 timestamp 么?

@wzl12356
Copy link
Contributor

wzl12356 commented Sep 8, 2021

meta是得到topic的broker集群信息的,比如分片,ip等,不过目前没有对外提供访问的数据接口。
获取最新的 offset 和对应的 timestamp 和 commit的接口问题,我下个版本试着增加一下。

@hiberabyss
Copy link
Author

在非消费者组的模式下,offset_timestamp应该在conf里面设置,示例如下:

KafkaConfig conf;

conf.set_offset_timestamp(xxx);

task->set_config(std::move(conf));

目前由于conf是针对task设置,因此设置的这个timestamp是这个task公用的

想问下如果同时设置了 toppar 里的 offset 和 conf 里的 timestamp 的话,是不是 timestamp 的优先级更高呢?

@wzl12356
Copy link
Contributor

timestamp只在第一次toppar里面取不到合法的offset的时候生效(比如:第一次消费的时候),后面都是toppar里面的offset

@hiberabyss
Copy link
Author

hiberabyss commented Oct 19, 2021

timestamp只在第一次toppar里面取不到合法的offset的时候生效(比如:第一次消费的时候),后面都是toppar里面的offset

感谢你的回复!但我现在发现设置 conf.set_offset_timestamp(1634642549); 到某个特定的时间戳还是没法生效,最终还是直接取的最久的 offset 消息,没有从这个时间戳开始往后消费消息。

@wzl12356
Copy link
Contributor

timestamp只在第一次toppar里面取不到合法的offset的时候生效(比如:第一次消费的时候),后面都是toppar里面的offset

感谢你的回复!但我现在发现设置 conf.set_offset_timestamp(1634642549); 到某个特定的时间戳还是没法生效,最终还是直接取的最久的 offset 消息,没有从这个时间戳开始往后消费消息。

kafka 版本还是2.2是吧?我来找一下原因

@hiberabyss
Copy link
Author

timestamp只在第一次toppar里面取不到合法的offset的时候生效(比如:第一次消费的时候),后面都是toppar里面的offset

感谢你的回复!但我现在发现设置 conf.set_offset_timestamp(1634642549); 到某个特定的时间戳还是没法生效,最终还是直接取的最久的 offset 消息,没有从这个时间戳开始往后消费消息。

kafka 版本还是2.2是吧?我来找一下原因

嗯,对的,用的 workflow 版本是 v0.9.8

@wzl12356
Copy link
Contributor

timestamp只在第一次toppar里面取不到合法的offset的时候生效(比如:第一次消费的时候),后面都是toppar里面的offset

感谢你的回复!但我现在发现设置 conf.set_offset_timestamp(1634642549); 到某个特定的时间戳还是没法生效,最终还是直接取的最久的 offset 消息,没有从这个时间戳开始往后消费消息。

单位应该是ms,因此应该写成conf.set_offset_timestamp(1634642549000L);

@hiberabyss
Copy link
Author

hiberabyss commented Oct 20, 2021

@wzl12356 感谢,换成 ms 之后确实是取的这个时间点之后的数据;但现在如果指定的时间点之后没有新数据,好像一直不会有 callback ,等有新的数据之后才会执行到 callback 那里,能有办法直接返回么?

@Barenboim
Copy link
Contributor

@wzl12356 感谢,换成 ms 之后确实是取的这个时间点之后的数据;但现在如果指定的时间点之后没有新数据,好像一直不会有 callback ,等有新的数据之后才会执行到 callback 那里,能有办法直接返回么?

你试下KafkaConfig里的set_fetch_timeout,把这个值设为0

@hiberabyss
Copy link
Author

设置之后没有效果哈,还是会一直等待新数据。

@wzl12356
Copy link
Contributor

目前的逻辑是,如果获取不到合法的offset会一直重试执行listoffset,直到获取合法的offset为止。
我想一下这个地方如何修改。

@hiberabyss
Copy link
Author

目前的逻辑是,如果获取不到合法的offset会一直重试执行listoffset,直到获取合法的offset为止。 我想一下这个地方如何修改。

能否换成没有新数据的话用当前最新的 offset 值呢?

@wzl12356
Copy link
Contributor

可以,我仔细考虑一下

@wzl12356
Copy link
Contributor

@hiberabyss
#610

@hiberabyss
Copy link
Author

@hiberabyss #610

感谢!能请教下现在没有新数据时 offset 是怎么更新的呢?

@wzl12356
Copy link
Contributor

wzl12356 commented Oct 27, 2021

@hiberabyss #610

感谢!能请教下现在没有新数据时 offset 是怎么更新的呢?

如果设置的timestamp太靠后,会用最新的offset,获取最新的一条数据

@hiberabyss
Copy link
Author

基于新版本的 workflow 测试时,这里的功能无法生效;当通过 KafkaConfig 设置较新的 timestamp 时 (在新的时间点没有产生消息),读消息会一直卡在那里,而不是返回最新的 offset .

测试的 Workflow 版本: "400a22f8b9e304bba1a51e66207efd10f9c04b27"

@wzl12356

@wzl12356
Copy link
Contributor

#610 这个版本之前你验证过吗,当时它可以生效吗?

@hiberabyss
Copy link
Author

#610 这个版本之前你验证过吗,当时它可以生效吗?

当时是可以生效的

@wzl12356
Copy link
Contributor

我查一下原因

@wzl12356
Copy link
Contributor

#754

@Barenboim
Copy link
Contributor

@hiberabyss
master分支已经修复。

@hiberabyss
Copy link
Author

@hiberabyss master分支已经修复。

感谢!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants