Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for exactly-once semantics in the source connector #142

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

tysonmote
Copy link

@tysonmote tysonmote commented Aug 6, 2023

This PR adds support for exactly-once semantics in the source connector, which was described in KIP-618 and shipped in Kafka Connect 3.3.0.

To test this PR beyond the existing test suite, I created a little chaos tester: https://github.com/tysonmote/mongo-kafka-chaos This repo stands up a MongoDB to Kafka pipeline using MongoDB Kafka Connector and then uses SIGKILL to randomly kill containers to confirm (more or less) exactly-once semantics.

Before this PR, killing Kafka brokers randomly would produce duplicate messages fairly easily:

./chaos.sh -i 60 kafka1,kafka2,kafka3

[...]
consumer  | Events consumed: 338,221 (2,539/s)
consumer  | Events consumed: 345,351 (374/s)
consumer  | ERROR: Got event 345350 after 345353 (out of order)
consumer  | ERROR: Duplicate event 345350
consumer  | ERROR: Duplicate event 345351
consumer  | ERROR: Duplicate event 345352
consumer  | ERROR: Duplicate event 345353
consumer  | Events consumed: 402,445 (5,709/s)
consumer  | Events consumed: 427,805 (2,536/s)

With this PR, Kafka Connect shows that it is using ExactlyOnceWorkerSourceTask:

connect  | [2023-08-06 22:14:11,876] INFO Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask)
connect  | [2023-08-06 22:14:11,876] INFO ExactlyOnceWorkerSourceTask{id=chaos-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)

And I'm unable to find duplicate (or even out-of-order) messages when killing Kafka brokers or the Connect worker randomly:

./chaos.sh -i 60 kafka1,kafka2,kafka3

[...]
consumer  | Events consumed: 2,230,041 (470/s)
consumer  | Events consumed: 2,298,021 (6,791/s)
consumer  | Events consumed: 2,310,926 (604/s)
consumer  | Events consumed: 2,363,276 (5,235/s)
consumer  | Events consumed: 2,385,986 (2,268/s)
consumer  | Events consumed: 2,398,847 (552/s)
consumer  | Events consumed: 2,467,306 (6,837/s)
./chaos.sh -i 60 connect

[...]
consumer  | Events consumed: 2,557,907 (13,361/s)
consumer  | Events consumed: 2,571,181 (2,632/s)
consumer  | Events consumed: 2,572,361 (33/s)
consumer  | Events consumed: 2,574,201 (368/s)
consumer  | Events consumed: 2,576,131 (384/s)
consumer  | Events consumed: 2,576,881 (149/s)
consumer  | Events consumed: 2,627,996 (10,223/s)
consumer  | Events consumed: 2,688,458 (11,915/s)
consumer  | Events consumed: 2,726,518 (7,577/s)
consumer  | Events consumed: 2,753,578 (5,323/s)

The above chaos tester is, of course, not very comprehensive. I haven't tested multi-node failures or networking issues; just arbitrary crashes using SIGKILL. The MongoDB writer is also not particularly high-throughput. I simply wrote it to model a use case that we have at Rippling, writing small batches of records in transactions.

@tysonmote tysonmote changed the title Add support for exactly-once semantics Add support for exactly-once semantics in the source connector Aug 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant