You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I added the restartStreamsOnRebalancing mode a few months ago to support the use case of transactional producers, in this PR: #427
We were running this in production since it got merged and noticed and investigated an issue regarding some of the records got lost from the consumer stream during rebalances.
The reason is that there maybe buffered records in state.bufferedRecords at the time the rebalance occurs and stops all the partition streams. (Because of being in restartStreamsOnRebalancing mode).
In more details, what happens is:
Somehow a poll returns some records for partitions with no request and they got stored in the bufferedRecords state
During rebalance we call endRevoked for all streams, to make sure no records from the previous generation will remain in flight after the rebalancing (as it would cause an error when committing a transaction later).
The endRevoked does not process the buffered records it just sets the interruption signal for all the partition streams.
After rebalancing, for those partitions that remained assigned to the same client, their consumer position is already advanced by the number of buffered records, so these records will not be fetched again (for those partitions that got assigned to another client, the problem does not exist because they are not committed yet so they got replayed there).
Note that the problem does not exist in the "normal" operation mode of zio-kafka because in that case the partition streams for those partitions that remain assigned to the current client remain are not closed, and the buffered records got used for the next downstream request.
I made a quick fix for this in a forked zio-kafka and tested it in the past days (with success) which is basically just "leaking out" the buffered records to the user's rebalance handler as a new parameter to onRevoked. This way the user can store these additional records, and add them to the commit between the restarted partition streams.
(As with restartStreamsOnRebalancing mode the service logic is basically consume all streams, do "final" commit for the previous generation, and start again on the new streams).
A probably nicer implementation would be to instead of just stopping the streams in endRevoked (in this mode), first inject the buffered records to it so there would be no need to separately pass these last records to the user code, they would just arrive as regular consumed records (but without downstream demand, triggered by the rebalance). This would require changing the current way of onInterrupt(promise) control of the partition streams with something like merging in a queue, I guess. (Did not implement this version yet).
Let's discuss the options here before I implement the fix in upstream zio-kafka
The text was updated successfully, but these errors were encountered:
The consumer might assign partitions and return records for them in the same call to poll despite those partitions being paused in the rebalance listener. We would previously die on these records, but now we buffer them.
I added the
restartStreamsOnRebalancing
mode a few months ago to support the use case of transactional producers, in this PR:#427
We were running this in production since it got merged and noticed and investigated an issue regarding some of the records got lost from the consumer stream during rebalances.
The reason is that there maybe buffered records in
state.bufferedRecords
at the time the rebalance occurs and stops all the partition streams. (Because of being inrestartStreamsOnRebalancing
mode).In more details, what happens is:
poll
returns some records for partitions with no request and they got stored in thebufferedRecords
stateendRevoked
for all streams, to make sure no records from the previous generation will remain in flight after the rebalancing (as it would cause an error when committing a transaction later).endRevoked
does not process the buffered records it just sets the interruption signal for all the partition streams.Note that the problem does not exist in the "normal" operation mode of
zio-kafka
because in that case the partition streams for those partitions that remain assigned to the current client remain are not closed, and the buffered records got used for the next downstream request.I made a quick fix for this in a forked zio-kafka and tested it in the past days (with success) which is basically just "leaking out" the buffered records to the user's rebalance handler as a new parameter to
onRevoked
. This way the user can store these additional records, and add them to the commit between the restarted partition streams.(As with
restartStreamsOnRebalancing
mode the service logic is basically consume all streams, do "final" commit for the previous generation, and start again on the new streams).A probably nicer implementation would be to instead of just stopping the streams in
endRevoked
(in this mode), first inject the buffered records to it so there would be no need to separately pass these last records to the user code, they would just arrive as regular consumed records (but without downstream demand, triggered by the rebalance). This would require changing the current way ofonInterrupt(promise)
control of the partition streams with something like merging in a queue, I guess. (Did not implement this version yet).Let's discuss the options here before I implement the fix in upstream zio-kafka
The text was updated successfully, but these errors were encountered: