Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Publish and Flush calls to not fail on async error (slow consumer) #158

Merged
merged 4 commits into from
Feb 3, 2016

Conversation

kozlovic
Copy link
Member

@kozlovic kozlovic commented Feb 2, 2016

  • Revisited the use of nc.err to limit it to report async errors.
  • Publish and Flush calls will no longer fail if an asyn error occurs
  • Flush will report an error if connection is closed/lost during flush
  • Flush calls are failed during a reconnect
  • Tests updated

- Revisited the use of nc.err to limit it to report async errors.
- Publish and Flush calls will no longer fail if an asyn error occurs
- Flush will report an error if connection is closed/lost during flush
- Flush calls are failed during a reconnect
- Tests updated
- Don't use returned variable
- Compact call with error checking
derekcollison added a commit that referenced this pull request Feb 3, 2016
Fix Publish and Flush calls to not fail on async error (slow consumer)
@derekcollison derekcollison merged commit 530ea9e into master Feb 3, 2016
@derekcollison derekcollison deleted the fix_pub_slow_consumer branch February 3, 2016 01:02
@TursMA
Copy link

TursMA commented May 12, 2016

Guys, can you explain me how we handle error if flush returns error.

if err := Bw.Flush(); err != nil {
                if nc.err == nil {
                    nc.err = err // <--------    what happens later with nc.err?
                }
            }

Who processes error in nc.err after assignee? Why we don't make a call of processOpErr?

My background: I have some microservices which live on different hosts. Nats is mq. I've faced with dropping messages in nats flow and now I'm trying to find all possible cases. I have very low message rate and it's not slow customer. Messages disappear. I make a EncodedConn.Request and don't receive message in subscriber. Subscribtion occurs before publishing. I have just one connection to nats per microservice. So I have to receive message, but...
Another suspucted place I found is gnatsd deliverMsg function.

writeErr:
    if deadlineSet {
        client.nc.SetWriteDeadline(time.Time{})
    }
    client.mu.Unlock()

    if ne, ok := err.(net.Error); ok && ne.Timeout() {
        atomic.AddInt64(&client.srv.slowConsumers, 1)
        client.Noticef("Slow Consumer Detected")
        client.closeConnection()
    } else {
        c.Debugf("Error writing msg: %v", err) //<--------- just printing error without disconnecting or reporting to stats.
    }

@derekcollison
Copy link
Member

Run the server with -DV, that should help you trace where the messages are being lost. What is the timeout on the request?

I have really only seen dropped messages via slow consumers, but if the request timeout is too short that could be a cause.

@TursMA
Copy link

TursMA commented May 12, 2016

Thank you for very quick response!
timeout is default - 2 seconds.
Tomorrow I'll try to reproduce with -DV flags.
JFIY possible errors on write operations (first google page) - http://man7.org/linux/man-pages/man2/write.2.html

What about error handling for Flush? There are a lot of logic for transmitting error to nc.err and then just ignore it.

@kozlovic
Copy link
Member Author

It's not ignored. The code you refer to is the flusher, which is a go-routine. There is no way to return directly the error condition there. So this is done through nc.err, which you can get by calling LastError() from anywhere.

@TursMA
Copy link

TursMA commented May 13, 2016

Why we do not do reconnect in the same way like on Read error?
LastError() is useless because of around goroutines. We don't know a moment when I have to do a call.

@kozlovic
Copy link
Member Author

Well, that's the thing (and discussed in the PR) we let the readLoop do the reconnect (it should get an error on read too if the socket is gone)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants