Skip to content

Commit

Permalink
[ADDED] RetryOnFailedConnect option
Browse files Browse the repository at this point in the history
Normally, nats.Connect() would fail if there is no server available
when this call is executed. With this new option, if no connection
can be made, this call will return no error and will trigger code
similar to the reconnect code. Therefore, MaxReconnect and ReconnectWait
options are used as if the library had been disconnected and is trying
to reconnect.
Note that subscription and publish calls will also behave as if the
library was in reconnection mode, which means that the calls are
buffered and produce no error until the reconnect buffer size is
full.
Obviously, since the connection is not connected, Flush or Request/Reply
calls would timeout.

If the ReconnectHandler is set, it will be invoked if the library
connects asynchronously.

Unrelated: fixed a test that had a t.skip()...

Resolves #195

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jul 8, 2020
1 parent a16f8c9 commit 8a78c4e
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 54 deletions.
15 changes: 15 additions & 0 deletions README.md
Expand Up @@ -285,6 +285,21 @@ nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {

```go

// Normally, the library will return an error when trying to connect and
// there is no server running. The RetryOnFailedConnect option will set
// the connection in reconnecting state if it failed to connect right away.
nc, err := nats.Connect(nats.DefaultURL,
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(10),
nats.ReconnectWait(time.Second),
nats.ReconnectHandler(func(_ *nats.Conn) {
// Note that this will be invoked for the first asynchronous connect.
}))
if err != nil {
// Should not return an error even if it can't connect, but you still
// need to check in case there are some configuration errors.
}

// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")
Expand Down
56 changes: 48 additions & 8 deletions nats.go
Expand Up @@ -392,6 +392,15 @@ type Options struct {
// gradually disconnect all its connections before shuting down. This is
// often used in deployments when upgrading NATS Servers.
LameDuckModeHandler ConnHandler

// RetryOnFailedConnect sets the connection in reconnecting state right
// away if it can't connect to a server in the initial set. The
// MaxReconnect and ReconnectWait options are used for this process,
// similarly to when an established connection is disconnected.
// If a ReconnectHandler is set, it will be invoked when the connection
// is established, and if a ClosedHandler is set, it will be invoked if
// it fails to connect (after exhausting the MaxReconnect attempts).
RetryOnFailedConnect bool
}

const (
Expand Down Expand Up @@ -1000,6 +1009,16 @@ func LameDuckModeHandler(cb ConnHandler) Option {
}
}

// RetryOnFailedConnect sets the connection in reconnecting state right away
// if it can't connect to a server in the initial set.
// See RetryOnFailedConnect option for more details.
func RetryOnFailedConnect(retry bool) Option {
return func(o *Options) error {
o.RetryOnFailedConnect = retry
return nil
}
}

// Handler processing

// SetDisconnectHandler will set the disconnect event handler.
Expand Down Expand Up @@ -1588,11 +1607,25 @@ func (nc *Conn) connect() error {
}
}
}
nc.initc = false

if returnedErr == nil && nc.status != CONNECTED {
returnedErr = ErrNoServers
}

if returnedErr == nil {
nc.initc = false
} else if nc.Opts.RetryOnFailedConnect {
nc.setup()
nc.status = RECONNECTING
nc.pending = new(bytes.Buffer)
if nc.bw == nil {
nc.bw = nc.newBuffer()
}
nc.bw.Reset(nc.pending)
go nc.doReconnect(ErrNoServers)
returnedErr = nil
}

return returnedErr
}

Expand Down Expand Up @@ -1912,10 +1945,12 @@ func (nc *Conn) doReconnect(err error) {
nc.err = nil
// Perform appropriate callback if needed for a disconnect.
// DisconnectedErrCB has priority over deprecated DisconnectedCB
if nc.Opts.DisconnectedErrCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
} else if nc.Opts.DisconnectedCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
if !nc.initc {
if nc.Opts.DisconnectedErrCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
} else if nc.Opts.DisconnectedCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
}
}

// This is used to wait on go routines exit if we start them in the loop
Expand Down Expand Up @@ -2056,6 +2091,10 @@ func (nc *Conn) doReconnect(err error) {
// This is where we are truly connected.
nc.status = CONNECTED

// If we are here with a retry on failed connect, indicate that the
// initial connect is now complete.
nc.initc = false

// Queue up the reconnect callback.
if nc.Opts.ReconnectedCB != nil {
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
Expand Down Expand Up @@ -2532,7 +2571,7 @@ func (nc *Conn) processInfo(info string) error {
// did not include themselves in the async INFO protocol.
// If empty, do not remove the implicit servers from the pool.
if len(ncInfo.ConnectURLs) == 0 {
if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
}
return nil
Expand Down Expand Up @@ -2595,7 +2634,7 @@ func (nc *Conn) processInfo(info string) error {
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
}
}
if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
}
return nil
Expand Down Expand Up @@ -2776,7 +2815,8 @@ func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {

// Proactively reject payloads over the threshold set by server.
msgSize := int64(len(data) + len(hdr))
if msgSize > nc.info.MaxPayload {
// Skip this check if we are not yet connected (RetryOnFailedConnect)
if !nc.initc && msgSize > nc.info.MaxPayload {
nc.mu.Unlock()
return ErrMaxPayload
}
Expand Down
2 changes: 2 additions & 0 deletions norace_test.go
Expand Up @@ -33,6 +33,7 @@ func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
opts.DisconnectedCB = func(_ *Conn) {
dch <- true
}
opts.NoCallbacksAfterClientClose = true

nc, errc := opts.Connect()
if errc != nil {
Expand Down Expand Up @@ -94,4 +95,5 @@ func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
t.Fatalf("Reconnect count incorrect: %d vs %d\n",
reconnectedCount, expectedReconnectCount)
}
nc.Close()
}

0 comments on commit 8a78c4e

Please sign in to comment.