Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear lastErr on reconnect success #503

Merged
merged 3 commits into from Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 28 additions & 1 deletion nats.go
Expand Up @@ -1453,6 +1453,7 @@ func (nc *Conn) connect() error {
if err == nil {
nc.srvPool[i].didConnect = true
nc.srvPool[i].reconnects = 0
nc.current.lastErr = nil
returnedErr = nil
break
} else {
Expand Down Expand Up @@ -1622,7 +1623,6 @@ func normalizeErr(line string) string {
// applicable. Will wait for a flush to return from the server for error
// processing.
func (nc *Conn) sendConnect() error {

// Construct the CONNECT protocol string
cProto, err := nc.connectProto()
if err != nil {
Expand Down Expand Up @@ -1849,6 +1849,18 @@ func (nc *Conn) doReconnect(err error) {

// Process connect logic
if nc.err = nc.processConnectInit(); nc.err != nil {
// If we have a lastErr recorded for this server
// do the normal processing here. We might get closed.
if nc.current.lastErr != nil {
err := nc.err
nc.mu.Unlock()
nc.processErr(err.Error())
nc.mu.Lock()
if nc.isClosed() {
break
}
}

nc.status = RECONNECTING
// Reset the buffered writer to the pending buffer
// (was set to a buffered writer on nc.conn in createConn)
Expand Down Expand Up @@ -1891,12 +1903,19 @@ func (nc *Conn) doReconnect(err error) {
if nc.Opts.ReconnectedCB != nil {
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
}

lastErr := nc.current.lastErr

// Release lock here, we will return below.
nc.mu.Unlock()

// Make sure to flush everything
nc.Flush()

if lastErr != nil && !nc.IsClosed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As seen in some of the Travis builds, I am not sure this would work. Looking at server code, I did not find a place where a client would be rejected right at the processing of the connect, therefore, I believe that the initial client PING (Flush here) could be successful and still have the auth expired right after. Which if that were to happen would have the effect to clear the error, get expired right after, reconnect, clear error, expire, etc..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server should queue the error on CONNECT iff the credentials have expired when the CONNECT arrives. I am seeing the flappers. Ran it locally for >1k times for both tests and saw no problems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should not fire the reconnect CB until we get the results of the flush back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point me to the server code where the -ERR is sent when receiving the CONNECT? I don't see that. If that is not happening, then the CONNECT followed by PING could result in PONG being received before any -ERR is received.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/nats-io/nats-server/blob/master/server/auth.go#L321

IIRC CONNECT triggers a process CONNECT which will check auth and if false is returned will send ERR and close the connection.

https://github.com/nats-io/nats-server/blob/master/server/client.go#L1239

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I am following. If we get same error for same server that is auth error of some sort we bail and close the connection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to jump on GH if I am not following your train of thought correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say that the client connects OK (jwt still valid), when the JWT expires, the server will send -ERR and close connection. The client receives the -ERR from readLoop and call processErr which does not close connection and allow a reconnect.
Now we are in doReconnect(). The createConn() calls sendConnect() and once the JWT has expired, from what you said, the server should have sent an ERR right away, so the client lib should receive this in readProto() BEFORE readLoop is even started. So processErr is not in play here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's jump on GH when you have a few minutes. I think I am following you but want to chat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok pushed a change that should handle correctly. Take a look when you have a minute.

nc.clearCurrentLastErr()
}

return
}

Expand Down Expand Up @@ -2278,6 +2297,14 @@ func (nc *Conn) processAuthError(err error) {
nc.mu.Unlock()
}

// clearCurrentLastErr will clear the last error when we know we have
// successfully connected after a flush.
func (nc *Conn) clearCurrentLastErr() {
nc.mu.Lock()
nc.current.lastErr = nil
nc.mu.Unlock()
}

// flusher is a separate Go routine that will process flush requests for the write
// bufio. This allows coalescing of writes to the underlying socket.
func (nc *Conn) flusher() {
Expand Down
13 changes: 13 additions & 0 deletions nats_test.go
Expand Up @@ -1454,6 +1454,9 @@ func TestExpiredUserCredentials(t *testing.T) {
if err := WaitTime(ch, 2*time.Second); err != nil {
t.Fatal("Should have closed after multiple failed attempts.")
}
if stats := nc.Stats(); stats.Reconnects > 2 {
t.Fatalf("Expected at most 2 reconnects, got %d", stats.Reconnects)
}
}

func TestExpiredUserCredentialsRenewal(t *testing.T) {
Expand Down Expand Up @@ -1514,9 +1517,19 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) {
t.Fatal("Should have reconnected.")
}

// We should not have been closed.
if nc.IsClosed() {
t.Fatal("Got disconnected when we should have reconnected.")
}

// Check that we clear the lastErr that can cause the disconnect.
// Our reconnect CB will happen before the clear. So check after a bit.
time.Sleep(50 * time.Millisecond)
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.current.lastErr != nil {
t.Fatalf("Expected lastErr to be cleared, got %q", nc.current.lastErr)
}
}

// If we are using TLS and have multiple servers we try to match the IP
Expand Down