Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documents usage of transactional producer #440

Closed
wants to merge 5 commits into from

Conversation

erikvanoosten
Copy link
Collaborator

@erikvanoosten erikvanoosten commented Mar 11, 2022

We have something very similar to the added example code running in production.

@erikvanoosten
Copy link
Collaborator Author

@iravid I retained the structure of the other example. There is however a way to commit offsets that is slightly more efficient:

  .mapChunksZIO { chunk =>
    val records     = chunk.map(_._1)
    val offsetBatch = chunk.foldLeft(OffsetBatch.empty)(_ merge _.2)

    TransactionalProducer.createTransaction.use { t =>
      t.produceChunkBatch[Any, Int, String](records, Serde.int, Serde.string, offsetBatch)
       .as(Chunk(offsetBatch))
  }
  .aggregateAsync(ZTransducer.foldLeft[OffsetBatch, OffsetBatch](OffsetBatch.empty)(_ merge _))
  .run(ZSink.foreach(_.commit))

This runs committing on another fiber then producing the messages. So delays in committing does not affect producing messages. (BTW, it would be great if the needed transducer would become part of the library as well.)
Also, in this fragment I tweaked the creation ofoffsetBatch slightly so that it does not need an intermediate chunk.

WDYT?

@erikvanoosten
Copy link
Collaborator Author

erikvanoosten commented Mar 15, 2022

Hi @vigoo, can you validate the documentation this PR adds please?

@erikvanoosten
Copy link
Collaborator Author

So we needed a few weeks to find out that this PR is quite wrong. This documentation is very much needed!
I'll update this PR if we manage to figure it out. Any input is still very welcome.

@vigoo
Copy link
Contributor

vigoo commented May 9, 2022

Sorry I somehow missed this PR before. I will try to add an example of how we use transactions with zio-kafka.

@erikvanoosten
Copy link
Collaborator Author

@vigoo We have something that works but it is quite ugly ('work' pending #469 😉). There are almost 2 pages of control code while we only need to consume, map and produce/commit.

I keep thinking we should be able to totally hide the generations from the user of the library. Since it is possible the user's code combines messages from multiple generations (and commit them together), the only safe way to do this is by not offering messages from a new generation from the consumer, until all messages from the previous generation have been committed.

Roughly like this:
• Upon a re-balance, we store the offsets for the current generation and we stop offering messages to the consumer.
• As soon as those offsets are all committed, we unblock the consumer.

WDYT?

@vigoo
Copy link
Contributor

vigoo commented May 31, 2022

Here is a short summary of how we are using it. I will try to find time and document it properly :)

  • Enabling the restartStreamOnRebalancing mode. This makes all partition streams to stop on rebalancing (even those that remains assigned)
  • Using partitionedAssignmentStream to get a stream of assigned partitions after each rebalance
  • mapM on this stream of streams, and for each "generation" drain all the streams (we are merging them with flatMapPar and then runDrain)
  • After each runDrain there is an onRebalance effect. The service need to upload some buffered data here to S3, and then commit the offsets and produce an event in a topic transactionally. This onRebalance effect which runs after runDrain asks for a RebalanceEvent from the rebalance handler - basically by reading a queue, and when it finishes it sets a promise which is stored in this rebalance event.
  • The other side of rebalancing is a RebalanceListener implementation that creates and enqueues a RebalanceEvent every time in onRevoked and then waits until the consumer stream finishes processing. This works because in restartStreamOnRebalancing mode all partition streams are stopped by the internal onRevoked handler, so blocking here is safe - the consumer stream will finish draining and it will run into the onRebalance step that takes the rebalance event and finishes. By waiting for the promise in the rebalance listener we make sure that the rebalancing is blocked until all elements are processed and the transactional commit is done.

I could not come up with anything that makes this simpler in the library, but of course it would be nice to reduce the complexity.

@jdegoes
Copy link
Member

jdegoes commented Feb 14, 2023

@vigoo @erikvanoosten

What's the status of this?

@erikvanoosten
Copy link
Collaborator Author

Unless @vigoo wants to spend some more time on this I propose we close this issue. The text I had written is not correct. In addition, I am following @svroonland 's work (helping out where I can, which unfortunately is not much, this code is very complex) that later can be expanded to make this problem go away completely.

@erikvanoosten
Copy link
Collaborator Author

erikvanoosten commented Mar 5, 2023

Lacking proper documentation for now, the transactional test added in #644 can be used for learning purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants