Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6549 Fix kafka consumer initial seek position #6712

Merged
merged 5 commits into from
Jul 11, 2022

Conversation

planetf1
Copy link
Member

@planetf1 planetf1 commented Jul 8, 2022

Signed-off-by: Nigel Jones nigel.l.jones+git@gmail.com

Description

Corrects an issue where Cohort registration events could be missed due to the delay between the kafka topic connector being started (and running asynchronously) and the first poll() being done to receive messages in the case where no topic offset was stored (ie new consumer/install).

Detail of fix

By default Kafka will store the offset that a client last read from for it's consumer group (this should be constant for a persistent server).
Sometimes there is no offset stored. In this case it will default to the 'last' message (kafka properties can set it to first instead)

Unfortunately this setting of 'last' message is done a number of seconds after the Kafka topic connector has started -- not until we get to the first poll() which is typically around 5s later. The result is that we miss messages in this window. In a production environment this probably won't be noticed, but it occurs consistently in our lab/test environments and CTS (before a random 'sleep' was added)

This fix hooks off the 'PartitionAssignment' event -- until then we don't know what offset kafka thinks it wants to use.

We look at the ~ the time the connector was started (actually the constructor was called), and save this as our latest desired read time. When we get the event above IF the current offset is later than this we rewind a little - and this is BEFORE any messages are read by a poll(). We only attempt this once. If anything goes wrong we revert to the default behaviour, and we also only do this on the first partition assignment, since after that kafka should be managing the offset correctly.

Related Issue(s)

Fixes: #6549

Testing

Tested with coco Pharma environment on k8s
verified offset is corrected if older ie

2022-07-08 12:12:23.901  INFO 1 --- [pool-2-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Received initial rebalance event
2022-07-08 12:12:23.937  INFO 1 --- [pool-2-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Seeking to 19 for partition 0 and topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.registration as current offset 25 is too late
2022-07-08 12:12:23.973 DEBUG 1 --- [pool-2-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Found records: 6
2022-07-08 12:12:23.973 DEBUG 1 --- [pool-2-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Received message: {"class":"OMRSEventV1","protocolVersionId":"OMRS V1.0","timestamp":1657282337878,"originator":{"serverName":"cocoMDS4","serverType":"Metadata Access Point","organizationName":"Coco Pharmaceuticals"},"eventCategory":"REGISTRY","registryEventSection":{"registryEventType":"REFRESH_REGISTRATION_REQUEST"}}

NOTE currently testing the case where we restart a server and consumer group exists

Release Notes & Documentation

Will add ref to release notes

Additional notes

  • No audit events added since there is no action for the user, and in general we will recover from missing events later.. ie by further cohort changes, potential future heartbeats, and the issue only occurs in a small time window
  • Log entries left in at INFO to add in any future debugging
  • Plan to cherry-pick to release 2.10
  • Any reviews for changes to use of log/audit - suggest change master only
  • This file is 'contaminated' with windows ^M characters. I would fix, but this would make the diff hard to see - may raise a separate issue for this as most of our source is in Unix format.

Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
@planetf1 planetf1 requested review from lpalashevski, mandy-chessell and davidradl and removed request for mandy-chessell July 8, 2022 12:45
@planetf1
Copy link
Member Author

planetf1 commented Jul 8, 2022

Example log with the info messages - covering multiple topics in the same threads, with a mix of threads without messages, and some with

2022-07-08 17:48:45.145  INFO 1 --- [ool-26-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Received initial PartitionsAssigned event
2022-07-08 17:48:45.145  INFO 1 --- [ool-26-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Querying for offset by timestamp: 1657302518936
2022-07-08 17:48:45.171  INFO 1 --- [ool-26-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Earliest offset found for 1657302518936 is 1657302520053
2022-07-08 17:48:45.186  INFO 1 --- [ool-26-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Seeking to 20 for partition 0 and topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.registration as current offset 26 is too late
2022-07-08 17:48:45.188  INFO 1 --- [ool-30-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Received initial PartitionsAssigned event
2022-07-08 17:48:45.188  INFO 1 --- [ool-30-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Querying for offset by timestamp: 1657302519124
2022-07-08 17:48:45.195  INFO 1 --- [ool-28-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Received initial PartitionsAssigned event
2022-07-08 17:48:45.195  INFO 1 --- [ool-28-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Querying for offset by timestamp: 1657302519008
2022-07-08 17:48:45.204 DEBUG 1 --- [ool-26-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Found records: 6
2022-07-08 17:48:45.204  INFO 1 --- [ool-28-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : No missed events found for partition 0 and topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.types
2022-07-08 17:48:45.205  INFO 1 --- [ool-30-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : No missed events found for partition 0 and topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.instances
2022-07-08 17:48:45.365  INFO 1 --- [ool-32-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Received initial PartitionsAssigned event
2022-07-08 17:48:45.365  INFO 1 --- [ool-32-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Querying for offset by timestamp: 1657302519216
2022-07-08 17:48:45.366  INFO 1 --- [ool-34-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Received initial PartitionsAssigned event
2022-07-08 17:48:45.366  INFO 1 --- [ool-34-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Querying for offset by timestamp: 1657302519301
2022-07-08 17:48:45.369  INFO 1 --- [ool-32-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : No missed events found for partition 0 and topic egeria.omag.server.cocoMDS4.omas.assetconsumer.outTopic
2022-07-08 17:48:45.390  INFO 1 --- [ool-34-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : No missed events found for partition 0 and topic egeria.omag.server.cocoMDS4.omas.communityprofile.outTopic
2022-07-08 17:48:45.495 DEBUG 1 --- [ool-14-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Found records: 0
2022-07-08 17:48:45.496 DEBUG 1 --- [pool-2-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Found records: 0
2022-07-08 17:48:45.899 DEBUG 1 --- [ool-16-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Found records: 0
2022-07-08 17:48:45.900 DEBUG 1 --- [ool-18-thread-1] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Found records: 0

Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
…ts on topic ->null

Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
@planetf1 planetf1 enabled auto-merge July 11, 2022 07:52
@planetf1 planetf1 merged commit 64fd229 into odpi:master Jul 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Problem registering governance engine / Asset consumer exceptions
2 participants