Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix writer async close #805

Merged
merged 1 commit into from
Dec 13, 2021
Merged

fix writer async close #805

merged 1 commit into from
Dec 13, 2021

Conversation

achille-roussel
Copy link
Contributor

@achille-roussel achille-roussel commented Dec 5, 2021

This PR simplifies and fixes internal synchronization in the kafka.Writer type.

There were a few issues:

  • marking the writer as closed was done via an atomic operations in Close, racing with the check in WriteMessages
  • increments of the partitionWriter wait group was done asynchronously after starting goroutines, racing with calls to Wait on these wait groups

I removed the sync group in partitionWriter and instead managed the count of inflight operations globally on a single wait group in Writer; we only care about the writer's Close call to block until all inflight operations are done, there is no need to have a hierarchical synchronization scheme.

Fixes #778

@achille-roussel achille-roussel merged commit 063303c into main Dec 13, 2021
@achille-roussel achille-roussel deleted the fix-writer-async-close branch December 13, 2021 04:50
@egaldamez
Copy link

Hello, this is an interesting fix. I'm currently facing an issues in which I'm trying to Close a writer so that buffered messages would be flushed and sent to my cluster. However, this doesn't always happen. My scenario is as follows:

  1. I have a go application (v1.7) that accepts http requests and these requests are transformed into kafka messages (buffered by the application and flushed every 15 seconds or 1000 messages)
  2. I use vegeta to send traffic to this application @ 1000 msg/s
  3. I stop vegeta. At this point, my application has some messages buffered.
  4. I send an OS interrupt signal to my application (which is configured to listen to these signals)
  5. I expect that the messages buffered are flushed, but this does not always happen especially when I am sending a high volume of messages.

So I'm wondering if this could be because of the race conditions addressed in this PR. I'm looking forward to fetching the latest version and testing this for myself :) thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Close function runs before Completion function when writing async
3 participants