Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve Kafka topic partitions offset #69

Closed
lebaptiste opened this issue Apr 29, 2019 · 12 comments · Fixed by ThreeDotsLabs/watermill-kafka#8
Closed

Retrieve Kafka topic partitions offset #69

lebaptiste opened this issue Apr 29, 2019 · 12 comments · Fixed by ThreeDotsLabs/watermill-kafka#8
Labels
enhancement New feature or request help wanted Extra attention is needed module: publisher module: subscriber

Comments

@lebaptiste
Copy link
Contributor

I have an API service which needs to consume a Kafka compacted topic before to be considered ready to handle any request traffic. How can I determine its readiness?

In this case, it would be best to know how many messages are still left to process before to have catched up with the latest messages on the topic. It seems there is no way to know this information currently.

I suggest the partitions offset to be retrieved at the time the consumer subscribes to a topic (from all partitions consumed). An alternative would be to include a field to messages, a flag IsLatest for example to let the consumer knows that a message was the last one at the time of retrieval (of course the offsets are keeping growing therefore it should not be considered as an absolute indication, it's time sensitive).

The best I can do as a workaround for now is to infer it. I can use the throughput and assume I've reached the end of the topic once the number of message/sec significantly dropped (during catch up phase, the consumer processes as much messages as it can, then after only as much as "live" events currently produced on the topic).

I'll be happy to know other alternatives people might have come up with. Thanks.

@roblaszczak
Copy link
Member

Some questions:

  1. This check of offsets is relatively rare? I mean it will be not done like 10x/s?
  2. You can have multiple partitions and consumer groups? Consumer group per service? Or no consumer groups?

Possible solutions:

  1. We can add message offset to message context and method to get topic last offset (per partition). You can get offsets at the beginning and wait until you reach them. Does it sound good?
  2. If you are using consumer group per service, in theory we can try to try to get information about consumer group and wait until it offset lag is 0. But it sounds a bit tricky for me ;)

@lebaptiste
Copy link
Contributor Author

lebaptiste commented Apr 30, 2019

In my scenario

  1. The check of the offset is to know if the service is still catching up, so one call would suffice at the subscription time
  2. The global service has its own consumer group, however the service could be split in several containers consuming each a subset of topic partitions, (e.g. 1 topic, 12 partitions, 3 consumers consuming 4 partitions each)

I would say solution 1 is preferable and makes sense to me however I don't know how much Watermill should try to be consistent across all pub/sub implementations as this change would have to be Kafka specific.

@mateusz-szczyrzyca
Copy link

@lebaptiste : I don't know if current kafka Go drivers allow you to query how many messages are still not consumed: IBM/sarama#489

I would suggest to keep the offset by your consumer app, if you are not using consumer group, and use this offset to start consume from specific offset (instead of beginning).

@roblaszczak
Copy link
Member

@lebaptiste there is no requirement that Pub/Sub implementations are limited with PubSub interface, we can add some extra methods :) I'm now going to the holiday and I will be back on Monday. If you have some time you can try to experiment with adding extra methods to the existing Kafka Pub/Sub for getting these offsets.

@roblaszczak
Copy link
Member

@lebaptiste I added Proof of Concept in kafka-offsets-info branch (#71).

There are:

  • kafka.MessagePartitionFromCtx method to get msg partition from ctx
  • kafka.MessagePartitionOffsetFromCtx to get offset from ctx
  • kafka.Subscriber.PartitionOffsets to get partition informations for the topic

What do you think? :)

You can try it with using go get -u github.com/ThreeDotsLabs/watermill@kafka-offsets-info (of course it is not prod ready!).

@roblaszczak roblaszczak added the enhancement New feature or request label May 19, 2019
@kjelle
Copy link

kjelle commented Aug 29, 2019

@lebaptiste : I don't know if current kafka Go drivers allow you to query how many messages are still not consumed: Shopify/sarama#489

I would suggest to keep the offset by your consumer app, if you are not using consumer group, and use this offset to start consume from specific offset (instead of beginning).

As commented by the Sarama author in Shopify/sarama#489 it depends on how you store offsets. IIRC Sarama does not do this out of the box.

I store the offsets in Kafka __consumer_offsets manually, then calcualte a "lag" by periodically compare the offset per consumer_group:topic:partition against the OffsetNewest. I run Sarama by disabling its auto-offset commit, manually curating my offsets.

@kjelle
Copy link

kjelle commented Aug 29, 2019

I have an API service which needs to consume a Kafka compacted topic before to be considered ready to handle any request traffic. How can I determine its readiness?

@lebaptiste I have a service which loads a compacted topic from beginning upon start. The service blocks while loading all events from Kafka. I determine the completion of this loading process by comparing the HighWaterMarkOffsets given by a partition against the offset state in my app for the same partition. When all offsets in all partitions align, the service has caught up.

In the background I run a go routine which periodically basically does the same, to keep the app in-sync with any new events from the compacted topic. I control the internal access of this mechanism with a RWLock.

@roblaszczak roblaszczak added the help wanted Extra attention is needed label Sep 16, 2019
@roblaszczak
Copy link
Member

I did Proof of Concept already: #71
If someone is interested you can improve it (and move to https://github.com/ThreeDotsLabs/watermill-kafka repository).

@eafzali
Copy link
Contributor

eafzali commented Oct 29, 2019

Any news on this?
We have exactly the same scenario, what I tried to do was that I tried to add offset values to the message context in our custom Marshaler, and then try to retrieve them in a middleware which is responsible to determine the readiness of the service. but the problem is that after Unmarshaling the Kafka message you are replacing the message context with the subscriber context:

https://github.com/ThreeDotsLabs/watermill-kafka/blob/721c7f628f1b58a1539a2e87c76412500ad20a21/pkg/kafka/subscriber.go#L483-L491

So it didn't work. I can also use metadata but that involved multiple type-castings from int to string and back.

Any advice? Can we work on the context to make sure we pass around any values being set on the context in the Unmarshaling phase? are you going to provide built-in functions to retrieve Kafka offset information from the context? I'd be happy to work on it we can agree on something concrete.

@roblaszczak
Copy link
Member

Hello @eafzali, I already started to implement it here:
https://github.com/ThreeDotsLabs/watermill/pull/71/files

but I don't know when I will have time to finish that :)

But if you want it would not require a lot of work to finish it. Basically it requires to move it to https://github.com/ThreeDotsLabs/watermill-kafka repository add some tests (as I remember it was working more or less).

@eafzali
Copy link
Contributor

eafzali commented Nov 4, 2019

OK nice, I think I can do that :)
I would also need to add MessageTopicFromCtx, or is there any other way to get the message topic in a middleware?

@roblaszczak
Copy link
Member

@eafzali probably the best idea to make it universal, would be adding it somewhere in the router :) PR is also welcome for that! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed module: publisher module: subscriber
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants