Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dataloss issue in restarting-streams-at-rebalancing mode #473

Merged
merged 12 commits into from Sep 23, 2022

Conversation

vigoo
Copy link
Contributor

@vigoo vigoo commented May 31, 2022

Fixes #469

@vigoo vigoo requested a review from iravid as a code owner May 31, 2022 08:23
svroonland
svroonland previously approved these changes Jun 1, 2022
Copy link
Collaborator

@svroonland svroonland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! I wonder if we could simplify a bit by not buffering at all but immediately pushing the 'to be buffered' records on the partition stream with this mechanism.

import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.stream.Take

case class PartitionStreamControl(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be private?

@vigoo
Copy link
Contributor Author

vigoo commented Jun 6, 2022

Looks good! I wonder if we could simplify a bit by not buffering at all but immediately pushing the 'to be buffered' records on the partition stream with this mechanism.

Sounds like a good idea but:

  • it will also affect the "normal mode" - need to think about whether it remains correct or not.
  • I'm not sure it would not make the processing unordered

@vigoo
Copy link
Contributor Author

vigoo commented Jun 7, 2022

Note: I'm still trying to validate this in production and seeing some problems (not sure yet if related)

@vigoo
Copy link
Contributor Author

vigoo commented Jun 8, 2022

The problem was that merge introduced a 1-element buffer for each partition (at least the ZIO 1 version) and it was taking too much memory in our application. As the whole drainQueue was only used at the "end" of the partition stream anyway, I changed the merge to concat which fixed our memory issue.

With this change from my end the change is ready to merge

@vigoo
Copy link
Contributor Author

vigoo commented Jun 22, 2022

Note: one more fix is required for this, I'm validating it in prod now.

@vigoo
Copy link
Contributor Author

vigoo commented Jul 6, 2022

I think the fix is now complete, I plan to publish a detailed blog post about it that will help understand.

@vigoo
Copy link
Contributor Author

vigoo commented Jul 15, 2022

Detailed explanation: https://ziverge.com/blog/zio-kafka-with-transactions-debugging-story

@vigoo vigoo merged commit 1e17e32 into zio:master Sep 23, 2022
@vigoo vigoo deleted the restarting-mode-dataloss-fix branch September 23, 2022 19:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Data loss in restartStreamsOnRebalancing mode
2 participants