Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy consumer group creation causes systematic loss of produced records before the first polling operation #464

Closed
codeJack opened this issue Aug 27, 2020 · 4 comments

Comments

@codeJack
Copy link
Contributor

Context

This might be working as designed but I guess worth discussing.

I'm using strimzi-kafka-bidge for integration testing purposes; more precisely I have a simple event-driven microservice - kafka streams based - with a basic source-processor-sink topology running in a cluster, and I want to test that producing a given input record against the source results into a certain output record produced against the sink.

Versions (to be tested against a more recent version)

  • Strimzi 0.16.2
  • Bridge 0.15.0

Steps to reproduce

Code snippets in python are not exhaustive but purely informative to clearly highlight the consumed bridge APIs.

  1. Create the consumer group. consumer_name is a randomly generated name
r = s.post(f"{bridge_url}/consumers/{consumer_name}",
           json={
               "name": consumer_name,
               "format": "binary",
               "enable.auto.commit": True,
               "auto.offset.reset": "latest"
           },
           headers={"Content-Type":"application/vnd.kafka.v2+json"})
  1. Add a subscription to the newly created consumer group for sink-topic
r = s.post(f"{bridge_url}/consumers/{consumer_name}/instances/{consumer_name}/subscription",
           json={
               "topics": [ "sink-topic" ]
           },
           headers={"Content-Type":"application/vnd.kafka.v2+json"})
  1. Produce one or more record(s) against source-topic.
r = s.post(f"{bridge_url}/topics/source-topic",
           json={
               "records" : [ {
                   "key" : "MTIzCg==",
                   "value" : "CgR0ZXN0"
           }]}, 
           headers={"Content-Type":"application/vnd.kafka.binary.v2+json"})
  1. Sleep a certain amount of time to ensure that the output record has been created by the service.
    No matter how much you sleep here the issue persists

  2. Consume records to assert the output record is what we expected

r = s.get(f"{bridge_url}/consumers/{consumer_name}/instances/{consumer_name}/records",
          headers={"Accept": "application/vnd.kafka.binary.v2+json"})

Response is empty

Please note that I have been trying to produce and consume against the very same topic to completely isolate the issue from any dependency it might have with my own service and it is still reproducible (sink-topic == source-topic).

Workaround

  • Step 1 - Create consumer group
  • Step 2 - Create subscription to topic for the newly created consumer group

BEGIN workaround steps

  • Step 5 - Consume records even if auto.offset.reset is latest and nothing has been produced yet simply to fully initialize the consumer (takes approximatively 3s probably due to the lazy initialization)
  • Sleep 1s - Apparently lazy consumer creation still needs some time after the first polling operation to be ready - 1s is enough according to the testing I have been conducting

END workaround steps

  • Step 3 - Produce against source-topic
  • Step 4 - Wait for the service to do its job
  • Step 5 - Consume against sink-topic Response contains output record

Conclusion / Proposal

It might be of common interest to expose a synchronous flavor of the API - for what concerns consumer group subscription - to smoothly cover scenarios like the one described here.

@ppatierno
Copy link
Member

ppatierno commented Aug 31, 2020

This was already discussed in the past more or less.
The bridge exposes the Kafka API but over HTTP, so think about your native "Java" Kafka client and how it works but using HTTP instead. Also, think at the protocol level.
When a Kafka consumer subscribes to a topic, nothing is done against the broker (on the wire); the topics to subscribe to are just saved internally in the consumer client and it doesn't join the consumer group yet.
Joining the consumer group only happens when the consumer executes a poll; only at that time, a join group message is sent.
It's not a "lazy" creation, it's just how the protocol works.
A usual Kafka application does the poll in a loop, so while the first poll is useful for joining the consumer group, the next one(s) for getting the available messages.
Anyway, in your specific case, in order to get the messages produced by the producer before the consumer joins, you should use "auto.offset.reset": "earliest", or?

@codeJack
Copy link
Contributor Author

codeJack commented Aug 31, 2020

Thanks for the comment @ppatierno.
Indeed, it makes sense when looking at it from a Kafka protocol standpoint - "The bridge exposes the Kafka API but over HTTP, so think about your native "Java" Kafka client and how it works but using HTTP instead. Also, think at the protocol level".

"auto.offset.reset": "earliest" would do as well, but it does imply consuming the whole history of events; for my own use-case polling once before producing to fix the latest offset in the consumer group seems more convenient.

I guess we can close this one then or you think it can be useful for tracking purposes ?

@ppatierno
Copy link
Member

@codeJack I have one more question ...

"auto.offset.reset": "earliest" would do as well, but it does imply consuming the whole history of events

You are right here but at this point, I missed your use case.

The "auto.offset.reset" policy is applied only when there is no committed offset for the consumer group where the consumer is joining.
It means that if you send messages with producer, then you join the consumer group with new random generated consumer_name (which is the group name) and policy "latest", after doing a poll for joining and a poll for getting messages ... it's normal not getting any messages (it will get the new ones only).
Instead, if you create the consumer with an already existing group name, you should get the messages sent before by the producer (because a committed offset already exists but it's behind the latest sent message offset); in this case the "auto.offset.reset" doesn't apply.

Of course, if you join the group with a poll before producing (as you mentioned), you are actually not using any kind of reset policy but just "waiting" for messages and consuming them as they came.

@strimzi strimzi deleted a comment from quickbooks2018 Jan 4, 2021
@ppatierno
Copy link
Member

@codeJack going to close this one because not hearing from you since years. If anything should be still discussed feel free to re-open this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants