Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to_nsq: added async flag #1332

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/to_nsq/to_nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
var (
topic = flag.String("topic", "", "NSQ topic to publish to")
delimiter = flag.String("delimiter", "\n", "character to split input from stdin")
async = flag.Bool("async", false, "use async mode (default false)")

destNsqdTCPAddrs = app.StringArray{}
)
Expand Down Expand Up @@ -138,7 +139,12 @@ func readAndPublish(r *bufio.Reader, delim byte, producers map[string]*nsq.Produ
}

for _, producer := range producers {
err := producer.Publish(*topic, line)
var err error
if *async {
err = producer.PublishAsync(*topic, line, nil, nil)
Copy link
Member

@ploxiln ploxiln Mar 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is "fire and forget". Are you trying to get better throughput, or ignore errors (perhaps from some producers)? I think it would be better to address each thing more specifically (but just the one you currently need for now).

  • for better throughput, you could use PublishAsync() for all producers, or for a batch of messages, and then check the publish success of the batch
  • to ignore errors for some producers or messages, that should be an explicit option

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for throughput, I'm at that point where to_nsq slows the software piping through it.

While we can live with a few miss on our side it totally does miss error reporting and probably a way to limit the number of simultaneous async requests.

I will improve this one further, feel free to close until then!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, rather than using PublishAsync, build an option to use multiple producers per destination nsqd (tradeoff would be more connections).

} else {
err = producer.Publish(*topic, line)
}
if err != nil {
return err
}
Expand Down