-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EoS #200 idempotent producer #203
Conversation
…mpotent producer
const logger = rootLogger.namespace('Producer') | ||
const sendMessages = createSendMessages({ logger, cluster, partitioner }) | ||
const transactionManager = createTransactionManager({ logger, cluster }) | ||
const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently I pass transactionManager
to the factory method. I do wonder if ultimately it will make more sense to provide a transaction manager in the sendMessages
call. Could also allow us to make the transaction manager less stateful. Currently it doesn't really matter but it may be something to refactor in #173
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what way would it make the transaction manager less stateful? Maybe I'm missing something, but passing it into the factory method seems much more appropriate than passing it with each sendMessage call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're probably right @Nevon, it's just that once we introduce transactions the transaction manager will become a bit of a state machine and I was trying to think of strategies to minimize the state we have to manage.
One way of minimizing that would to be to have the transaction manager fetch the producer id on instantiation, so that there's no ambiguity about whether the idempotent producer has already be initialized. Since that fetch occur after connection that would probably mean passing the transaction manager into the send messages method, or lazily creating the sendMessages instance.
Ultimately I do think what I've implemented is the most straight forward solution, with the benefit that it closely mirrors how transactions are managed in other Kafka libraries. Thus far in practice I haven't had an issue working with the state machine (see preview here, just thinking through the alternatives.
src/producer/sendMessages.js
Outdated
const size = topicMetadata.get(topicName).messagesPerPartition[partition].length | ||
const previous = topicMetadata.get(topicName).sequencePerPartition[partition] | ||
|
||
transactionManager.updateSequence(topicName, partition, previous + size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sequence number is an int32
; it will eventually reach the max number. We will need to rotate the number, similar to what I do with the correlation id.
kafkajs/src/network/connection.js
Lines 316 to 322 in 095b6e0
nextCorrelationId() { | |
if (this.correlationId === Number.MAX_VALUE) { | |
this.correlationId = 0 | |
} | |
return this.correlationId++ | |
} |
WDYT?
We should also move this logic to the transaction manager:
transactionManager.updateSequence({ topicName, partition, previousSequence })
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me! Worth noting that the Java implementation appears not to bother with this check (will implement nonetheless): https://github.com/apache/kafka/blob/9a0ea25fee85837748145d37c69cf4d9bb7f9933/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L417
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tulios Also, Number.MAX_VALUE
is significantly larger than the maximum (signed) Int32 value (2 ^ 32 vs 2 ^ 1024). AFAIK JS has no predefined constant for Int32, so I'll need to define my own.
const broker = await cluster.findControllerBroker() | ||
const result = await broker.initProducerId({ transactionTimeout }) | ||
|
||
producerId = result.producerId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we preserve this across reconnections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tulios Yes I think we should, good point! I'll update the producer so that it only calls initProducerId
if uninitialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ianwsperber will you do this on the transactional producer PR or this one? (If you decide for the transactional PR we can merge)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tulios Forgot about this, just pushed a commit to address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outstanding work @ianwsperber, I pushed some cosmetic changes. I think it looks good; I just have a couple of questions regarding the sequence number and the init.
Thanks for the great work.
Resolves #200
Add an
idempotent
flag to the producer factory method to support creation of idempotent producers. Documented this as an "experimental" feature, since I haven't had a chance to observe its behavior in practice yet.Implementation details
producer.connect()
calltransactionManager
to track sequences across messages and include inproduce
requestacks=-1
andretries>0
. Default retry of MAX_SAFE_INTEGERThis PR additionally includes a few tests verifying the recent work allowing the
firstSequence
option to be provided in record batches.