Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a sane Writer + Batch System #852

Open
arianitu opened this issue Feb 17, 2022 · 11 comments
Open

Create a sane Writer + Batch System #852

arianitu opened this issue Feb 17, 2022 · 11 comments
Assignees

Comments

@arianitu
Copy link

arianitu commented Feb 17, 2022

Describe the solution you'd like

The Writer abstraction is a bad design when it comes to writing messages and batching.

Current System

Users call WriteMessages which takes N messages. The user must collect N messages themselves and then call WriteMessages which is basically creating a batching system on top of a batching system. What's worse is if a user calls WriteMessages in a for loop (the most common way of writing messages from streams, files, etc.) it throttles you to about 1 / second by default.

There's lots of issues where users switch BatchTimeout to 10 * time.Millisecond which is just a hack to get the desired / intuitive behaviour.

New System

Users called WriteMessage which batches internally and sends them at either BatchSize or BatchTimeout. This API is intuitive and anyone can use it. Users who send messages in for loop (most common way of writing messages from streams, files, etc.) will get the expected behaviour.

The user no longer has to create a batching system on top of the existing batching system and just has to call WriteMessage with all their messages and it will handle the rest.

@nlsun
Copy link
Contributor

nlsun commented Feb 25, 2022

Hi @arianitu , have you had a chance to try the Writer in "asynchronous mode"?

Docs: https://pkg.go.dev/github.com/segmentio/kafka-go#Writer

Code:

kafka-go/writer.go

Lines 54 to 74 in b952e63

// In asynchronous mode, the program may configure a completion handler on the
// writer to receive notifications of messages being written to kafka:
//
// w := &kafka.Writer{
// Addr: kafka.TCP("localhost:9092"),
// Topic: "topic-A",
// RequiredAcks: kafka.RequireAll,
// Async: true, // make the writer asynchronous
// Completion: func(messages []kafka.Message, err error) {
// ...
// },
// }
//
// ...
//
// // Because the writer is asynchronous, there is no need for the context to
// // be cancelled, the call will never block.
// if err := w.WriteMessages(context.Background(), msgs...); err != nil {
// // Only validation errors would be reported in this case.
// ...
// }

@hadrienk
Copy link

hadrienk commented Mar 2, 2022

I think the WriteMessages() implementation is problematic in several ways:

It can either be asynchronous or not, depending on the config. I think we should have dedicated async methods. Something like:

func (w *BufferedWriter) WriteMessagesAsync(ctx context.Context, messages ...kafka.Message) <-chan error {
  // ...
}

The current WriteMessages method can buffer, timeout or be async. This is too complicated. In some cases we want to write a messages immediately. We should have a method for that:

// WriteMessages write the messages. This method blocks until all the messages are sent.
func (w *BufferedWriter) WriteMessages(ctx context.Context, messages ...kafka.Message)  error {
  // ...
}

In cases where performances matter we could add a buffered API:

// WriteBuffered saves the messages in a buffer, flushing every BufferSize.
// If there's room in the buffer, no message are written. The returned error is the first
// error returned by the Flush method.
func (w *BufferedWriter) WriteBuffered(ctx context.Context, messages ...kafka.Message) error {
	// [...]
}

// Flush sends and then empties the content of the buffer. 
func (w *BufferedWriter) Flush(ctx context.Context) error {
  // [...]
}

Building up on the API above, we could add another methods in order to handle timeouts:

// WriteBufferedAndFlushAfter works as WriteBuffered but also calls a Flush with the same context after duration.
func (w *FlushingWriter) WriteBufferedAndFlushAfter(ctx context.Context, delay time.Duration, messages ...kafka.Message) chan error {
  // [...]
}

I have a working implementation with tests and could contribute if you agree @achille-roussel / @nlsun. @arianitu it would be nice to have your feedback as well.

@achille-roussel achille-roussel self-assigned this Mar 4, 2022
@derekadams
Copy link

It may make sense to use an existing library such as: https://pkg.go.dev/github.com/eapache/go-resiliency@v1.2.0/batcher#section-readme to handle batching and have it call WriteMessages as a side-effect.

@lovromazgon
Copy link

I see another problem with the current implementation - batching is forced on the developer. There is no way for the developer to disable it and handle batching externally. If the developer tries to manually collect batches and decide when to flush them to Kafka, the writer will either break batches down into even smaller sizes or wait until the batch timeout is reached.

I would prefer the approach proposed by @derekadams - the function WriteMessages should just write whatever messages it gets directly to Kafka without any batching logic, batching can then be implemented as a wrapper around that function.

@nlsun
Copy link
Contributor

nlsun commented Sep 2, 2022

Hi, thanks for all the ideas, we probably won't change the API of WriteMessages but going by the comments it could be worth looking into creating a new function that:

  • Writes synchronously
  • Does not do any re-batching under the hood.

This can act as a building block for those that want fine grained control over batching.

How does that sound? If someone is interested we'd be happy to review a pull request.

@yusufozturk
Copy link

yusufozturk commented Sep 11, 2022

Same here, @nlsun. We are already doing our batching, so I would like to send the messages directly to Kafka.

Weird thing about this library, it forces you to do batching and leaving all other functions private so nobody can make something useful on top the base functions.

Also if someone uses batching of this library, it means that they don't care about message safety. So maybe nobody uses this library for important message delivery but instead cheap messages like metrics, logs etc? Because what happens if application crashes during the batching? All important messages are gone..

I think I will test Sarama. I might also send a PR for my requirements.

Edit: Later on, I saw the Produce functions. I think people can skip the batching behaviour via this method, right? But probably I will strip the most of the library and just leave the produce methods. Our purpose is low latency and less resource usage on sending messages to kafka.

Edit 2: Apparently Sarama had really bad updates recently. There are a lot of data racing, panics etc in the open issues in the last 30 days. So I will focus on this library but refactor the base code for our needs.

@yusufozturk
Copy link

Is anybody refactor Writer + Batch System since February? Maybe I can refork from where you left. @arianitu Thanks.

@yusufozturk
Copy link

@nlsun Sent the PR.

@nlsun
Copy link
Contributor

nlsun commented Sep 20, 2022

@yusufozturk thanks for the PR. Just to clarify since I ended up forgetting the original purpose of this issue:

The original ask was something along the lines of

Add a non-blocking WriteMessages that also guarantees messages are sent.

We ended up discussing a slightly different goal of

Add a way to bypass the batching in Writer.WriteMessages

I created a new issue to discuss bypassing the batching in Writer.WriteMessages here: #994

@3AceShowHand
Copy link
Contributor

Any update to the issue ?

Our service use sarama to implement the producer originally, and it suffers from several problems inside the library so that we decide try this library instead.

Our application send messages one by one, and does not do any batching before call writer.WriterMessages methods, this looks would cause performance problem, we found that our throughput is reduced from 20k/s to 17k/s compared to the sarama producer.

From the comments above, I found that maybe we should batching messages before send them to the writer, will this improve the throughput performance ?

@nlsun
Copy link
Contributor

nlsun commented Feb 24, 2023

@3AceShowHand we haven't had anyone volunteer to implement this feature yet.

For your immediate performance issues, yes I believe batching will improve your throughput.

peliseev pushed a commit to peliseev/kafka-go that referenced this issue Jul 14, 2023
Add BatchNoWait parameter. This parameter resolves the performance issue that occurs when a synchronous client waits for a BatchTimeout, even when there is nothing left to flush.

See segmentio#852
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants