Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better capture pending writes when getting disconnected #86

Open
kozlovic opened this issue Oct 13, 2015 · 19 comments
Open

Better capture pending writes when getting disconnected #86

kozlovic opened this issue Oct 13, 2015 · 19 comments
Assignees

Comments

@kozlovic
Copy link
Member

Imagine doing nc.Publish() in a loop. The server is stopped/restarted and the library tries to reconnect. The pending buffer is created only in doReconnect(). That leaves a window where nc.bw.write() calls may accumulate to the point of causing a flush (socket write) which will likely result in an error returned from the nc,Publish() call.
Also, the use of a buffered writer may cause situations where the buffer is flushed and some left over is then appended to the now empty buffer. If a reconnect occurs at the "right" time, this data will then be sent to the server, which may cause a parse error (if what was in the buffer was a partial), which would result in another reconnect.

@ColinSullivan1
Copy link
Member

With regards to the buffered writer on reconnect, the buffer could be trimmed to discard a partial message before sending the data to the server. Thoughts?

@derekcollison
Copy link
Member

I have this fixed now, we don't do partials, but we watch the outbound buffer size and if exceeded we produce and error on publish. Also its configurable.

@kozlovic
Copy link
Member Author

Not sure if this captures the original issue I tried to described. Suppose we are connected and here is a view of nc.bw (the '.' indicates free space):

Publish 'abc';
[PUB foo 3\r\nabc\rn....] (4 bytes left)

Publish 'def'
[PUB foo 3\r\nabc\rnPUB ] => automatically flushed by bufio
[foo 3\r\ndef\rn........] (8 bytes left).

At this point, we disconnect. The content of nc.bw is moved to pending. Then say we reconnect, resend the subscriptions (if any) and flush the pending buffer to the newly created connection. We are going to send:

[foo 3\r\ndef\rn........]

Which would result in a parse error on the server. Am I missing something?

@derekcollison
Copy link
Member

derekcollison commented Jan 20, 2016

During disconnect we have a bytes.Buffer() backed BufferedWriter. So as things move along things accumulate in the nc.bw and flush to nc.pending. When we reconnect we flush bw to nc.pending and send that to the new socket connection, then replace nc.bw with a new BufferedWriter backed by the socket.

@kozlovic
Copy link
Member Author

I know that. The problem is the state of nc.bw prior to the disconnect. If you look at the above example, nc.bw contains a partial before the disconnect.

@derekcollison
Copy link
Member

If that is the case we will get immediately disconnected and reconnect again. So we will have data loss. Not sure its worth it when we create pending to make sure of this, but we could. We should create a test case first for sure.

@kozlovic
Copy link
Member Author

Re-opening but low priority.

@kozlovic kozlovic reopened this Sep 19, 2018
@derekcollison derekcollison self-assigned this Sep 19, 2018
@rwrz
Copy link

rwrz commented Jul 14, 2021

I'm not sure if this is my issue, but consider this:

Config

  • PingInterval is 5 seconds
  • MaxPingsOutstanding = 2
  • nats client will only perceive its disconnection after 10 seconds

Client Behavior

  • client sends ASYNC messages every 5 seconds
  • he gets disconnected for 30 seconds
  • then re-connects

Question

What was supposed to happen with all messages sent between the first 10 seconds, until realizes it lost its connection and starts to try to re-connect?

Accordingly, with my tests, we lose all messages sent before it starts to try to re-connect.
Was it supposed to happen? Loose data?

Also, I tried to use JetStream and PublishAsync, the same thing happen.

@kozlovic
Copy link
Member Author

@rwrz What you describe seem to be a basic behavior of being able to publish messages while in disconnected state, which should work as long as you have a big enough reconnecting buffer to hold the published messages.

I am not clear when you say client sends ASYNC messages. In core NATS, all publish calls are async in nature. The Publish() call simply adds data to an outgoing buffer that is flushed from an internal go routine or in place when the buffer gets full.

Could you share a simple test case (including connection configuration, etc..) that demonstrates the issue?

@rwrz
Copy link

rwrz commented Jul 14, 2021

By client sends ASYNC messages, it is because of the JetStream test.

Testing code:

package main

import (
	"fmt"
	"github.com/nats-io/nats.go"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

	fmt.Println("Connecting...")

	natsConn, err := nats.Connect(
		"wss://ENDPOINT:443",
		nats.MaxPingsOutstanding(2),
		nats.MaxReconnects(99),
		nats.ReconnectWait(time.Second*2),
		nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
			fmt.Printf("NATS got disconnected! Reason: %q \n", err)
		}),
		nats.ReconnectHandler(func(nc *nats.Conn) {
			fmt.Printf("NATS got reconnected to %v!\n", nc.ConnectedUrl())
		}),
		nats.ClosedHandler(func(nc *nats.Conn) {
			fmt.Printf("NATS connection closed. Reason: %q\n", nc.LastError())
		}),
		nats.PingInterval(time.Second*10), // <--- HERE
		nats.MaxPingsOutstanding(2),       // <--- HERE
		nats.Compression(true),
		nats.RetryOnFailedConnect(true),
		nats.UserInfo("USER", "PASSWORD"),
	)
	if err != nil {
		panic(err)
	}
	defer natsConn.Drain()

	jsConn, err := natsConn.JetStream()
	if err != nil {
		panic(err)
	}

	_, err = jsConn.AddStream(&nats.StreamConfig{
		Name:     "TEST",
		Subjects: []string{"TEST.*"},
	})
	defer jsConn.DeleteStream("TEST")
	if err != nil {
		// panic(err)
	}

	counter := 0
	d := time.NewTicker(time.Second * 5)

	_, err = jsConn.Subscribe("TEST.PING", func(msg *nats.Msg) {
		fmt.Printf("PING RECEIVED [%s] \n", string(msg.Data))
	}, nats.DeliverAll()) // all messages

	fmt.Println("Starting our loop...")
	for {
		select {
		case <-d.C:
			fmt.Printf("connected: [%t] - reconnecting: [%t] \n", natsConn.IsConnected(), natsConn.IsReconnecting())
			counter = counter + 1
			_, err = jsConn.PublishAsync("TEST.PING", []byte(fmt.Sprintf("%d", counter)), nats.MsgId(fmt.Sprintf("%d", counter)))
			if err != nil {
				fmt.Println(err)
			}
			fmt.Printf("PING SENT [%d] \n", counter)
		case _ = <-signals:
			return
		}
	}
}

Now, the result, after the PING SENT [2] I have disconnected from my internet... then turned it on again at PING 9.

Connecting...
Starting our loop...
connected: [true] - reconnecting: [false] 
PING SENT [1] 
PING RECEIVED [1] 
connected: [true] - reconnecting: [false] 
PING SENT [2] 
PING RECEIVED [2] 
connected: [true] - reconnecting: [false] 
PING SENT [3] 
connected: [true] - reconnecting: [false] 
PING SENT [4] 
connected: [true] - reconnecting: [false] 
PING SENT [5] 
connected: [true] - reconnecting: [false] 
PING SENT [6] 
connected: [true] - reconnecting: [false] 
PING SENT [7] 
NATS got disconnected! Reason: "nats: stale connection"
connected: [false] - reconnecting: [true] 
PING SENT [8] 
connected: [false] - reconnecting: [true] 
PING SENT [9] 
NATS got reconnected to wss://.....:443
PING RECEIVED [8] 
PING RECEIVED [9] 
connected: [true] - reconnecting: [false] 
PING SENT [10] 
PING RECEIVED [10] 
connected: [true] - reconnecting: [false] 
PING SENT [11] 
PING RECEIVED [11] 
connected: [true] - reconnecting: [false] 
PING SENT [12] 
PING RECEIVED [12] 
connected: [true] - reconnecting: [false] 
PING SENT [13] 
PING RECEIVED [13] 
connected: [true] - reconnecting: [false] 
PING SENT [14] 
PING RECEIVED [14] 

As you can see, all PINGS between 2 and 8 (3 to 7), are lost.
It only buffers messages after it starts to reconnect.

Ps.: I'm using WebSocket + jetstream. But the behaviour is the same on core nats.

@kozlovic
Copy link
Member Author

Please set an async error handler to see if you get any error reported there.
The JS publish async is likely to report an error since it did not get the pub ack back, but the message should still have been added to the outgoing buffer.
You are saying that you have the same issue without websocket, but are you always using JetStream publish or would you have the same issue without JS at all? Just plain subscribe and plain publish?

@kozlovic
Copy link
Member Author

Ok, I think I understand what's going on. When you disable your internet, the connection is not marked as closed (and by the way this is what the pings/pong are for), so the library does not know that there is an issue. When you publish message 3, the message is added to outgoing buffer, and an internal go routine is flushing this out to the socket, BUT, this does not fail, however, this is cleared from the NATS library buffer, so the data is no longer available to us.

When the ping timer routines detects that it missed the configured number of outstanding pings, it CLOSES the connection and switch to a pending (memory) buffer. This is why all publish calls made after the library is in reconnecting mode, even if not yet connected, will be ultimately sent to the server once the connection is established.

So this is normal that messages 3 to 7 (in your example) are not sent, and therefore the messages not received.
You would have to use the sync version, or check on the publish async future result to know if the publish failed and if so maybe resend it, etc..

@rwrz
Copy link

rwrz commented Jul 14, 2021

I have added the error handler, nothing there.
Also, the PublishAsync doesn't report any errors. But I discovered something interesting:

If I disconnect from the internet and BEFORE connection goes to 'stale state', I re-connect, data won't be lost (Pings 3,4 and 5)

So, something is happening when the connection goes to "stale state" that loose all the pending messages.
Check the logs:

  • connected
  • ping 1,2
  • disconnected
  • ping 3,4,5
  • connected
  • pings 3,4,5 (recovered)
  • ping 6-11
  • disconnected
  • ping 12-15
  • connection goes to stale state
  • ping 16-17
  • reconnected
  • lost pings 12-15
  • recovered 16-17
  • ...
Starting our loop...
connected: [true] - reconnecting: [false] 
PING SENT [1] 
PING RECEIVED [1] 
connected: [true] - reconnecting: [false] 
PING SENT [2] 
PING RECEIVED [2] 
connected: [true] - reconnecting: [false] 
PING SENT [3] 
connected: [true] - reconnecting: [false] 
PING SENT [4] 
connected: [true] - reconnecting: [false] 
PING SENT [5] 
PING RECEIVED [3] 
PING RECEIVED [4] 
PING RECEIVED [5] 
connected: [true] - reconnecting: [false] 
PING SENT [6] 
PING RECEIVED [6] 
connected: [true] - reconnecting: [false] 
PING SENT [7] 
PING RECEIVED [7] 
connected: [true] - reconnecting: [false] 
PING SENT [8] 
PING RECEIVED [8] 
connected: [true] - reconnecting: [false] 
PING SENT [9] 
PING RECEIVED [9] 
connected: [true] - reconnecting: [false] 
PING SENT [10] 
PING RECEIVED [10] 
connected: [true] - reconnecting: [false] 
PING SENT [11] 
PING RECEIVED [11] 
connected: [true] - reconnecting: [false] 
PING SENT [12] 
connected: [true] - reconnecting: [false] 
PING SENT [13] 
connected: [true] - reconnecting: [false] 
PING SENT [14] 
connected: [true] - reconnecting: [false] 
PING SENT [15] 
NATS got disconnected! Reason: "nats: stale connection" 
connected: [false] - reconnecting: [true] 
PING SENT [16] 
connected: [false] - reconnecting: [true] 
PING SENT [17] 
NATS got reconnected to wss://.....:443!
connected: [true] - reconnecting: [false] 
PING SENT [18] 
PING RECEIVED [16] 
PING RECEIVED [17] 
PING RECEIVED [18] 
connected: [true] - reconnecting: [false] 
PING SENT [19] 
PING RECEIVED [19] 
connected: [true] - reconnecting: [false] 
PING SENT [20] 
PING RECEIVED [20] 
connected: [true] - reconnecting: [false] 
PING SENT [21] 
PING RECEIVED [21] 
connected: [true] - reconnecting: [false] 
PING SENT [22] 
PING RECEIVED [22] 

@rwrz
Copy link

rwrz commented Jul 14, 2021

Oh, I was typing then you answered.
So this is the expected behaviour? Loose all my async pending requests when it goes to stale?

Well, seems we already have a pending buffer, then we use a different one after we lose the connection.
Why not move all pending messages from one to another when the connection is lost?

Considering:

  • re-connection is a feature
  • we can send async messages, believing that nats would handle them properly
  • it should communicate that they never had been delivered OR deliver them when the connection is available again

If you point me out where those buffers are, I can convert from one to another when it goes to stale and propose a PR.


[EDITED]
Ok. AsyncPublish would return a PubAckFuture where I can listen to the Err() channel. Even though, it doesn't work. I was listening for a few minutes and nothing happened. It triggers OK, but doesn't trigger error.
Although, I believe that nats should handle delivering issues like this because of a re-connection. If not, why even bother to re-connect if I will lose everything that is pending?

	go func() {
		select {
		case <-ack.Ok():
			fmt.Printf("PING SENT [%s] \n", string(ack.Msg().Data))
			break
		case <-ack.Err():
			fmt.Printf("PING ERROR [%s] \n", string(ack.Msg().Data))
			break
		}
	}()

@wallyqs
Copy link
Member

wallyqs commented Jul 14, 2021

@rwrz There is a nats.FlusherTimeout option that might help in this case to detect the write error faster instead of waiting for outstanding pings limit to hit first:

nats.go/nats.go

Line 331 in 274aa57

FlusherTimeout time.Duration

@kozlovic
Copy link
Member Author

@rwrz You have to understand what when the library writes to the socket, the system holds the data in its own buffer and the library no longer has access to it. When your internet connection is disconnected but TCP stack does not report the failure, the library writing to the socket won't fail, and it won't have access to the buffer anymore.

We could transfer what is still in the buffer (anything that is there prior to the flusher doing a socket write) in the memory buffer once the library is notified of the disconnect and creates its memory buffer, this is what we used to do but can lead to the original issue opened here (presence of partials in the buffer that would cause a disconnect anyway because server would reject as invalid protocol).

Again, as soon as Publish() returns the data has been added to a buffer that a go routine is responsible for "flushing" that is, writing to the socket. At that point, the library buffer will be empty but the system socket buffer will hold this data. We don't have control of that one.

Regarding the pub ack future, you have to use a case <-time.After(xx): because the Err() and Ok() wait for the response from the server, which will never come back if the message was never sent.

@kozlovic
Copy link
Member Author

@rwrz To clarify, imagine NATS has a buffer with "ABC"

  • we call write(socketID, buf, 3), this call will return OK even if you have disabled your internet connection (until TCP stack reports the connection as invalid).
  • after this call, the NATS buffer will be empty because write() was successful.

If you plug back your internet before TCP reports the failure, the OS will flush its socket buffer "ABC" and that message will reach the server.

If NATS library missed the pings (based on your configuration), it will close its end of the connection, but again, it has NOTHING in its own buffer (at the library level, "ABC" is in the OS socket buffer), so when creating the memory buffer, there would be nothing to copy over.

As I said, there could be "ABC" before the flusher writes data to the socket (or publish() call itself if library buffer is full), but then we are back to possible issues described here.

The point is that NATS cannot hold onto data that OS write() has taken ownership, and this is why core NATS is fire-and-forget.

Using JS, you have the ability to do publish sync (which waits for the confirmation in place), or if you use async, you can check on the completion and resend if you get errors. With de-dup window configured in the stream, it is safe to resend a message that was already received by the server.

@rwrz
Copy link

rwrz commented Jul 14, 2021

@kozlovic thank you for all this clarification.
I think the easiest path, using JS, will be to use PublishSync and then just re-send the message in case of failure.
Or it will error again, or it will go to the pending buffer (if on stale state already).

Makes sense everything you said.

I hope this thread can help others with similar issues (and questions).

@kozlovic
Copy link
Member Author

@rwrz Thanks! I am happy that I made myself clear and that it helped you understand what was happening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants